Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 1333546)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -32,10 +32,10 @@
 import collection.mutable.ListBuffer
 import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import scala.collection.Map
-import kafka.serializer.Encoder
 import kafka.api.{ProducerRequest, TopicData, PartitionData}
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
+import kafka.serializer.{DefaultEncoder, Encoder}
 
 /**
  * Utility functions to help with testing
@@ -122,6 +122,7 @@
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval", "1")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("follower.socket.timeout.ms", "500")
     props
   }
   
@@ -280,12 +281,13 @@
   /**
    * Create a producer for the given host and port
    */
-  def createProducer[K, V](zkConnect: String): Producer[K, V] = {
+  def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
     val props = new Properties()
     props.put("zk.connect", zkConnect)
     props.put("buffer.size", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
+    props.put("serializer.class", encoder.getClass.getCanonicalName)
     new Producer[K, V](new ProducerConfig(props))
   }
 
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1333546)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -27,9 +27,9 @@
 import kafka.network._
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
-import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
+import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -80,6 +80,7 @@
     // topic metadata request only requires 2 APIs from the log manager
     val logManager = EasyMock.createMock(classOf[LogManager])
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
     EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
     EasyMock.replay(logManager)
@@ -94,7 +95,7 @@
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, kafkaZookeeper)
+    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(revision 1333546)
+++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(working copy)
@@ -18,8 +18,6 @@
 package kafka.integration
 
 import junit.framework.Assert._
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.atomic.AtomicInteger
 import kafka.utils.{ZKGroupTopicDirs, Logging}
 import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
 import kafka.server._
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1333546)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -177,66 +177,66 @@
     producer.close
   }
 
+//  @Test
+//  def testZKSendToExistingTopicWithNoBrokers() {
+//    val props = new Properties()
+//    props.put("serializer.class", "kafka.serializer.StringEncoder")
+//    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
+//    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+//
+//    val config = new ProducerConfig(props)
+//
+//    val producer = new Producer[String, String](config)
+//    var server: KafkaServer = null
+//
+//    // create topic
+//    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+//
+//    try {
+//      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
+//      // all partitions have broker 0 as the leader.
+//      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+//      Thread.sleep(100)
+//      // cross check if brokers got the messages
+//      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+//      val messageSet1 = response1.messageSet("new-topic", 0).iterator
+//      assertTrue("Message set should have 1 message", messageSet1.hasNext)
+//      assertEquals(new Message("test".getBytes), messageSet1.next.message)
+//
+//      // shutdown server2
+//      server2.shutdown
+//      server2.awaitShutdown()
+//      Thread.sleep(100)
+//      // delete the new-topic logs
+//      Utils.rm(server2.config.logDir)
+//      Thread.sleep(100)
+//      // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
+//      val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+//      val config2 = new KafkaConfig(props2) {
+//        override val numPartitions = 4
+//      }
+//      server = TestUtils.createServer(config2)
+//      Thread.sleep(100)
+//
+//      // now there are no brokers registered under test-topic.
+//      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+//      Thread.sleep(100)
+//
+//      // cross check if brokers got the messages
+//      val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+//      val messageSet2 = response2.messageSet("new-topic", 0).iterator
+//      assertTrue("Message set should have 1 message", messageSet2.hasNext)
+//      assertEquals(new Message("test".getBytes), messageSet2.next.message)
+//
+//    } catch {
+//      case e: Exception => fail("Not expected", e)
+//    } finally {
+//      if(server != null) server.shutdown
+//      producer.close
+//    }
+//  }
+//
   @Test
-  def testZKSendToExistingTopicWithNoBrokers() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
-    val config = new ProducerConfig(props)
-
-    val producer = new Producer[String, String](config)
-    var server: KafkaServer = null
-
-    // create topic
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-
-    try {
-      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-      // all partitions have broker 0 as the leader.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-      // cross check if brokers got the messages
-      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.next.message)
-
-      // shutdown server2
-      server2.shutdown
-      server2.awaitShutdown()
-      Thread.sleep(100)
-      // delete the new-topic logs
-      Utils.rm(server2.config.logDir)
-      Thread.sleep(100)
-      // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
-      val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-      val config2 = new KafkaConfig(props2) {
-        override val numPartitions = 4
-      }
-      server = TestUtils.createServer(config2)
-      Thread.sleep(100)
-
-      // now there are no brokers registered under test-topic.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-
-      // cross check if brokers got the messages
-      val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet2.next.message)
-
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    } finally {
-      if(server != null) server.shutdown
-      producer.close
-    }
-  }
-
-  @Test
   def testAsyncSendCanCorrectlyFailWithTimeout() {
     val timeoutMs = 500
     val props = new Properties()
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1333546)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -59,7 +59,7 @@
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -105,17 +105,17 @@
 
       var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
       assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
-      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
 
     }finally {
       TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
Index: core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala	(revision 1333546)
+++ core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala	(working copy)
@@ -20,7 +20,6 @@
 import scala.collection._
 import org.junit.{After, Before, Test}
 import junit.framework.Assert._
-import kafka.server._
 import kafka.message._
 import kafka.api._
 import kafka.utils.TestUtils
Index: core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(revision 0)
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import kafka.api._
+import kafka.message.{ByteBufferMessageSet, Message}
+import kafka.common.ErrorMapping
+import kafka.utils.TestUtils
+
+class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
+  val props = createBrokerConfigs(2)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  var brokers: Seq[KafkaServer] = null
+  val topic = "foobar"
+
+  override def setUp() {
+    super.setUp()
+    brokers = configs.map(config => TestUtils.createServer(config))
+  }
+
+  override def tearDown() {
+    super.tearDown()
+    brokers.foreach(_.shutdown())
+  }
+
+//  def testReplicaFetcherThread() {
+//    val partition = 0
+//    val testMessageList = List("test1", "test2", "test3", "test4")
+//    val testMessages = testMessageList.map(m => new Message(m.getBytes))
+//    val fetchSize = ConsumerConfig.FetchSize
+//    var fetchOffset: Long = 0
+//    val leaderBrokerId = configs.head.brokerId
+//    val followerBrokerId = configs.last.brokerId
+//    val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
+//
+//    // create a topic and partition
+//    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+//
+//    // send test messages to leader
+//    val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
+//    producer.send(new ProducerData[String, String](topic, "test", testMessageList))
+//
+//    // mock the replica log
+//    val replicaLogName = "%s/%s-%d".format(TestUtils.tempDir().getAbsolutePath, topic, partition)
+//    // TODO: instead, use an in memory log
+//    val replicaLogFile = new File(replicaLogName)
+//    if(replicaLogFile.exists())
+//      replicaLogFile.delete()
+//    replicaLogFile.mkdir()
+//    val replicaLog = new Log(replicaLogFile, 500, 5000, false)
+//
+//    // create replica fetch thread
+//    val testPartition = new Partition(topic, partition, SystemTime)
+//    testPartition.leaderId(Some(leaderBrokerId))
+//    val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog))
+//    val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last)
+//
+//    // start a replica fetch thread to the above broker
+//    replicaFetchThread.start()
+//
+//    Thread.sleep(30)
+//    replicaFetchThread.shutdown()
+//
+//    assertEquals(60L, testReplica.log.get.nextAppendOffset)
+//
+//    println("Closing replica log")
+//    replicaLog.close()
+//    replicaLogFile.delete()
+//  }
+
+  private def getFetchRequest(topic: String, partition: Int, offset: Long, fetchSize: Int,
+                              replicaId: Int, clientId: String, maxWait: Int = 0, minBytes: Int = 0): FetchRequest = {
+    new FetchRequestBuilder()
+      .addFetch(topic, partition, offset, fetchSize)
+      .replicaId(replicaId)
+      .clientId(clientId)
+      .maxWait(maxWait)
+      .minBytes(minBytes)
+      .build()
+  }
+
+  private def getFetchResponse(topic: String, partition: Int,
+                               fetchOffset: Long, hw: Long,
+                               messages: Seq[Message],
+                               versionId: Short = FetchResponse.CurrentVersion, correlationId: Int = 0): FetchResponse = {
+    val topicData = new TopicData(topic, Array(new PartitionData(partition, ErrorMapping.NoError, fetchOffset, hw,
+      new ByteBufferMessageSet(messages:_*))))
+    new FetchResponse(versionId, correlationId, Array(topicData))
+  }
+}
\ No newline at end of file
Index: core/src/test/resources/log4j.properties
===================================================================
--- core/src/test/resources/log4j.properties	(revision 1333546)
+++ core/src/test/resources/log4j.properties	(working copy)
@@ -18,7 +18,7 @@
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=INFO
+log4j.logger.kafka=ERROR
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
 log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1333546)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -28,6 +28,7 @@
 
 private[kafka] object Log {
   val FileSuffix = ".kafka"
+  val hwFileName = "highwatermark"
 
   /**
    * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
@@ -117,6 +118,11 @@
   /* The name of this log */
   val name  = dir.getName()
 
+  /* create the leader highwatermark file handle */
+  private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + Log.hwFileName, "rw")
+
+  private var hw: Long = 0
+
   private val logStats = new LogStats(this)
 
   Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
@@ -124,7 +130,7 @@
   /* Load the log segments from the log files on disk */
   private def loadSegments(): SegmentList[LogSegment] = {
     // open all the segments read-only
-    val accum = new ArrayList[LogSegment]
+    val logSegments = new ArrayList[LogSegment]
     val ls = dir.listFiles()
     if(ls != null) {
       for(file <- ls if file.isFile && file.toString.endsWith(Log.FileSuffix)) {
@@ -133,35 +139,35 @@
         val filename = file.getName()
         val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
         val messageSet = new FileMessageSet(file, false)
-        accum.add(new LogSegment(file, messageSet, start))
+        logSegments.add(new LogSegment(file, messageSet, start))
       }
     }
 
-    if(accum.size == 0) {
+    if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment
       val newFile = new File(dir, Log.nameFromOffset(0))
       val set = new FileMessageSet(newFile, true)
-      accum.add(new LogSegment(newFile, set, 0))
+      logSegments.add(new LogSegment(newFile, set, 0))
     } else {
       // there is at least one existing segment, validate and recover them/it
       // sort segments into ascending order for fast searching
-      Collections.sort(accum, new Comparator[LogSegment] {
+      Collections.sort(logSegments, new Comparator[LogSegment] {
         def compare(s1: LogSegment, s2: LogSegment): Int = {
           if(s1.start == s2.start) 0
           else if(s1.start < s2.start) -1
           else 1
         }
       })
-      validateSegments(accum)
+      validateSegments(logSegments)
 
       //make the final section mutable and run recovery on it if necessary
-      val last = accum.remove(accum.size - 1)
+      val last = logSegments.remove(logSegments.size - 1)
       last.messageSet.close()
       info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
       val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
-      accum.add(mutable)
+      logSegments.add(mutable)
     }
-    new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
+    new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
   }
 
   /**
@@ -192,6 +198,8 @@
     lock synchronized {
       for(seg <- segments.view)
         seg.messageSet.close()
+      checkpointHW()
+      hwFile.close()
     }
   }
 
@@ -317,6 +325,7 @@
       segments.view.last.messageSet.flush()
       unflushed.set(0)
       lastflushedTime.set(System.currentTimeMillis)
+      checkpointHW()
      }
   }
 
@@ -359,7 +368,38 @@
     }
     ret
   }
- 
+
+  def recoverUptoLastCheckpointedHW() {
+    if(hwFile.length() > 0) {
+      // read the last checkpointed hw from disk
+      hwFile.seek(0)
+      val lastKnownHW = hwFile.readLong()
+      // find the log segment that has this hw
+      val segmentToBeTruncated = segments.view.reverse.find(segment => lastKnownHW >= segment.start && lastKnownHW < segment.size)
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          segment.messageSet.truncateUpto(lastKnownHW)
+          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
+        case None =>
+          assert(lastKnownHW <= segments.view.last.messageSet.size,
+            "Last checkpointed hw cannot be greater than the latest message in the log")
+          error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
+            .format(lastKnownHW, segments.view.head.start, segments.view.last.messageSet.size))
+      }
+    }else
+      info("No previously checkpointed high watermark for " + name)
+  }
+
+  def setHW(latestLeaderHW: Long) {
+    hw = latestLeaderHW
+  }
+
+  def checkpointHW() {
+    hwFile.seek(0)
+    hwFile.writeLong(hw)
+    hwFile.getChannel.force(true)
+  }
+
   def getTopicName():String = {
     name.substring(0, name.lastIndexOf("-"))
   }
Index: core/src/main/scala/kafka/cluster/Replica.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Replica.scala	(revision 1333546)
+++ core/src/main/scala/kafka/cluster/Replica.scala	(working copy)
@@ -18,6 +18,73 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.common.InvalidPartitionException
+import kafka.server.{KafkaConfig, ReplicaFetcherThread}
 
-case class Replica(brokerId: Int, partition: Partition, topic: String,
-                   var log: Option[Log] = None, var hw: Long = -1, var leo: Long = -1, isLocal: Boolean = false)
\ No newline at end of file
+class Replica(val brokerId: Int, val partition: Partition, val topic: String,
+              var log: Option[Log] = None, var leoUpdateTime: Long = -1L) {
+  private var logEndOffset: Long = -1L
+  private var replicaFetcherThread: ReplicaFetcherThread = null
+
+  def leo(newLeo: Option[Long] = None): Long = {
+    log match {
+      case Some(l) =>
+        l.getHighwaterMark
+      case None =>
+        newLeo match {
+          case Some(newOffset) =>
+            logEndOffset = newOffset
+            logEndOffset
+          case None => -1L
+        }
+    }
+  }
+
+  def isLocal: Boolean = {
+    log match {
+      case Some(l) => true
+      case None => false
+    }
+  }
+
+  def hw(highwaterMarkOpt: Option[Long] = None): Long = {
+    highwaterMarkOpt match {
+      case Some(highwaterMark) =>
+        log match {
+          case Some(l) =>
+            l.setHW(highwaterMark)
+            highwaterMark
+          case None => throw new InvalidPartitionException("Unable to set highwatermark for topic %s ".format(topic) +
+            "partition %d on broker %d, since there is no local log for this partition"
+              .format(partition.partitionId, brokerId))
+        }
+      case None => -1
+    }
+  }
+
+  def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) {
+    val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId)
+    replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config)
+    replicaFetcherThread.start()
+  }
+
+  def stopReplicaFetcherThread() {
+    if(replicaFetcherThread != null)
+      replicaFetcherThread.shutdown()
+  }
+
+  def close() {
+    if(replicaFetcherThread != null)
+      replicaFetcherThread.shutdown()
+  }
+
+  override def toString(): String = {
+    val replicaString = new StringBuilder
+    replicaString.append("ReplicaId: " + brokerId)
+    replicaString.append("; Topic: " + topic)
+    replicaString.append("; Partition: " + partition.toString)
+    replicaString.append("; isLocal: " + isLocal)
+    replicaString.append("; Highwatermark: " + hw())
+    replicaString.toString()
+  }
+}
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1333546)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -16,12 +16,85 @@
  */
 package kafka.cluster
 
+import kafka.common.NoLeaderForPartitionException
+import kafka.utils.{SystemTime, Time, Logging}
+
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
  * TODO: Commit queue to be added as part of KAFKA-46. Add AR, ISR, CUR, RAR state maintenance as part of KAFKA-302
  */
-case class Partition(topic: String, val partId: Int, var leader: Option[Replica] = None,
-                     assignedReplicas: Set[Replica] = Set.empty[Replica],
-                     inSyncReplicas: Set[Replica] = Set.empty[Replica],
-                     catchUpReplicas: Set[Replica] = Set.empty[Replica],
-                     reassignedReplicas: Set[Replica] = Set.empty[Replica])
+case class Partition(topic: String, partitionId: Int, time: Time = SystemTime,
+                     var inSyncReplicas: Set[Replica] = Set.empty[Replica],
+                     var catchUpReplicas: Set[Replica] = Set.empty[Replica],
+                     var reassignedReplicas: Set[Replica] = Set.empty[Replica]) extends Logging {
+  private var leaderReplicaId: Option[Int] = None
+  private var assignedReplicas: Set[Replica] = Set.empty[Replica]
+  private var highWatermarkUpdateTime: Long = -1L
+
+  def leaderId(newLeader: Option[Int] = None): Option[Int] = {
+    if(newLeader.isDefined)
+      leaderReplicaId = newLeader
+    leaderReplicaId
+  }
+
+  def assignedReplicas(replicas: Option[Set[Replica]] = None): Set[Replica] = {
+    replicas match {
+      case Some(ar) =>
+        assignedReplicas = ar
+      case None =>
+    }
+    assignedReplicas
+  }
+
+  def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
+
+  def addReplica(replica: Replica): Boolean = {
+    if(!assignedReplicas.contains(replica)) {
+      assignedReplicas += replica
+      true
+    }else false
+  }
+
+  def updateReplicaLEO(replica: Replica, leo: Long) {
+    val replicaMetadata = assignedReplicas().find(_.brokerId == replica.brokerId)
+    replicaMetadata.get.leoUpdateTime = time.milliseconds
+    replicaMetadata.get.leo(Some(leo))
+    debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
+  }
+
+  def leaderReplica(): Replica = {
+    val leaderReplicaId = leaderId()
+    if(leaderReplicaId.isDefined) {
+      val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get)
+      if(leaderReplica.isDefined) leaderReplica.get
+      else throw new IllegalStateException("No replica for leader %d in the replica manager"
+        .format(leaderReplicaId.get))
+    }else
+      throw new NoLeaderForPartitionException("Leader for topic %s partition %d does not exist"
+        .format(topic, partitionId))
+  }
+
+  def leaderHW(newHw: Option[Long] = None): Long = {
+    newHw match {
+      case Some(highWatermark) =>
+        leaderReplica().hw(newHw)
+        highWatermarkUpdateTime = time.milliseconds
+        highWatermark
+      case None =>
+        leaderReplica().hw()
+    }
+  }
+
+  def hwUpdateTime: Long = highWatermarkUpdateTime
+
+  override def toString(): String = {
+    val partitionString = new StringBuilder
+    partitionString.append("Topic: " + topic)
+    partitionString.append("; Partition: " + partitionId)
+    partitionString.append("; Leader: " + leaderId())
+    partitionString.append("; Assigned replicas: " + assignedReplicas().map(_.brokerId).mkString(","))
+    partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.append("; Reassigned replicas: " + reassignedReplicas.map(_.brokerId).mkString(","))
+    partitionString.toString()
+  }
+}
Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(revision 1333546)
+++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(working copy)
@@ -55,14 +55,14 @@
       m.leader match {
         case Some(leader) =>
           val leaderReplica = new Replica(leader.id, partition, topic)
-          partition.leader = Some(leaderReplica)
+          partition.leaderId(Some(leaderReplica.brokerId))
           debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
           partition
         case None =>
           debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
           partition
       }
-    }.sortWith((s, t) => s.partId < t.partId)
+    }.sortWith((s, t) => s.partitionId < t.partitionId)
   }
 
   /**
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1333546)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -93,11 +93,8 @@
       val partitionIndex = getPartition(event.getKey, totalNumPartitions)
       val brokerPartition = topicPartitionsList(partitionIndex)
 
-      val leaderBrokerId = brokerPartition.leader match {
-        case Some(leader) => leader.brokerId
-        case None => -1
-        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-      }
+      // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+      val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
 
       var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
       ret.get(leaderBrokerId) match {
@@ -108,7 +105,7 @@
           ret.put(leaderBrokerId, dataPerBroker)
       }
 
-      val topicAndPartition = (event.getTopic, brokerPartition.partId)
+      val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
       var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
       dataPerBroker.get(topicAndPartition) match {
         case Some(element) =>
@@ -126,7 +123,7 @@
     debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
     val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
     debug("Broker partitions registered for topic: %s are %s"
-      .format(pd.getTopic, topicPartitionsList.map(p => p.partId).mkString(",")))
+      .format(pd.getTopic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
     topicPartitionsList
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1333546)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -208,7 +208,13 @@
     needRecover.set(false)    
     len - validUpTo
   }
-  
+
+  def truncateUpto(hw: Long) = {
+    channel.truncate(hw)
+    setSize.set(hw)
+    setHighWaterMark.set(hw)
+  }
+
   /**
    * Read, validate, and discard a single message, returning the next valid offset, and
    * the message being validated
Index: core/src/main/scala/kafka/common/QueueFullException.scala
===================================================================
--- core/src/main/scala/kafka/common/QueueFullException.scala	(revision 1304473)
+++ core/src/main/scala/kafka/common/QueueFullException.scala	(working copy)
@@ -1,3 +1,5 @@
+package kafka.common
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,8 +17,6 @@
  * limitations under the License.
  */
 
-package kafka.producer.async
-
 /* Indicates the queue for sending messages is full of unsent messages */
 class QueueFullException(message: String) extends RuntimeException(message) {
   def this() = this(null)
Index: core/src/main/scala/kafka/network/SocketServer.scala
===================================================================
--- core/src/main/scala/kafka/network/SocketServer.scala	(revision 1333546)
+++ core/src/main/scala/kafka/network/SocketServer.scala	(working copy)
@@ -64,6 +64,7 @@
    * Shutdown the socket server
    */
   def shutdown() = {
+    info("Shutting down socket server")
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1333546)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -133,7 +133,7 @@
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
     try {
       // NOTE: first increment epoch, then become leader
       val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
@@ -143,7 +143,7 @@
       updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
         "%s;%d".format(currentISR.mkString(","), newEpoch))
       info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
-      Some(newEpoch)
+      Some(newEpoch, currentISR)
     } catch {
       case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
       case oe => None

Property changes on: core/src/main/scala/kafka/server
___________________________________________________________________
Added: svn:ignore
   + FollowerReplica.scala


Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1333546)
+++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -20,11 +20,12 @@
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import java.net.InetAddress
-import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
 import kafka.admin.AdminUtils
 import java.lang.{Thread, IllegalStateException}
+import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, KafkaZookeeperClient}
+import kafka.utils.ZkUtils._
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -33,8 +34,10 @@
  *
  */
 class KafkaZooKeeper(config: KafkaConfig,
-                     addReplicaCbk: (String, Int) => Replica,
-                     getReplicaCbk: (String, Int) => Option[Replica]) extends Logging {
+                     addReplicaCbk: (String, Int, Set[Int]) => Replica,
+                     getReplicaCbk: (String, Int) => Option[Replica],
+                     becomeLeader: (Replica, Seq[Int]) => Unit,
+                     becomeFollower: (Replica, Int) => Unit) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
@@ -174,8 +177,9 @@
   private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
     partitions.foreach { partition =>
       val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+      info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
       if(assignedReplicas.contains(config.brokerId)) {
-        val replica = addReplicaCbk(topic, partition)
+        val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
         startReplica(replica)
       } else
         warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
@@ -184,25 +188,34 @@
   }
 
   private def startReplica(replica: Replica) {
-    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partId, replica.brokerId))
-    replica.log match {
-      case Some(log) =>  // log is already started
-      case None =>
-      // TODO: Add log recovery upto the last checkpointed HW as part of KAFKA-46
-    }
-    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partId) match {
-      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
+    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId,
+      replica.brokerId))
+    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,
+        leader))
+        // check if this broker is the leader, if not, then become follower
+        if(leader != config.brokerId)
+          becomeFollower(replica, leader)
       case None => // leader election
         leaderElection(replica)
     }
   }
 
   def leaderElection(replica: Replica) {
-    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partId))
+    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
     // read the AR list for replica.partition from ZK
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partId).map(r => r.toInt)
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
     // TODO: read the ISR as part of KAFKA-302
-    if(assignedReplicas.contains(replica.brokerId)) {
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
+    val canBecomeLeader = inSyncReplicas.size > 0 match {
+      case true => inSyncReplicas.contains(replica.brokerId)
+      case false =>
+        // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
+        assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
+          " %s partition %d".format(replica.topic, replica.partition.partitionId))
+        assignedReplicas.contains(replica.brokerId)
+    }
+    if(canBecomeLeader) {
       // wait for some time if it is not the preferred replica
       try {
         if(replica.brokerId != assignedReplicas.head)
@@ -210,16 +223,31 @@
       }catch {
         case e => // ignoring
       }
-      val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
-      newLeaderEpoch match {
-        case Some(epoch) =>
-          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
+      val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId, replica.brokerId)
+      newLeaderEpochAndISR match {
+        case Some(epochAndISR) =>
+          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partitionId))
           // TODO: Become leader as part of KAFKA-302
+          becomeLeader(replica, epochAndISR._2)
         case None =>
       }
     }
   }
 
+  def updateInSyncReplicasForPartition(topic: String, partition: Int, newISR: Set[Int]) = {
+    val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partition.toString))
+    if(replicaListAndEpochString == null) {
+      throw new NoLeaderForPartitionException(("Illegal partition state. ISR cannot be updated for topic " +
+        "%s partition %d since leader and ISR does not exist in ZK".format(topic, partition)))
+    }
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      val epoch = replicasAndEpochInfo.last
+      updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partition.toString),
+        "%s;%s".format(newISR.mkString(","), epoch))
+    }
+  }
+
   class TopicChangeListener extends IZkChildListener with Logging {
 
     @throws(classOf[Exception])
@@ -270,9 +298,20 @@
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       // handle leader change event for path
-      val newLeader: String = data.asInstanceOf[String]
-      debug("Leader change listener fired for path %s. New leader is %s".format(dataPath, newLeader))
-      // TODO: update the leader in the list of replicas maintained by the log manager
+      val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
+      val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
+      val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
+      debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch))
+      // TODO: update the leader in the list of replicas maintained by the replica manager
+      val topicPartitionInfo = dataPath.split("/")
+      val topic = topicPartitionInfo.take(3).last
+      val partition = topicPartitionInfo.takeRight(2).head.toInt
+      info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
+      val replica = getReplicaCbk(topic, partition).getOrElse(null)
+      assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
+        .format(topic, partition, config.brokerId))
+      replica.partition.leaderId(Some(newLeader))
+      assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
     }
 
     @throws(classOf[Exception])
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1333546)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -20,6 +20,7 @@
 import java.util.Properties
 import kafka.utils.{Utils, ZKConfig}
 import kafka.message.Message
+import kafka.consumer.ConsumerConfig
 
 /**
  * Configuration settings for the kafka server
@@ -105,7 +106,25 @@
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
+  val keepInSyncTime = Utils.getLong(props, "isr.in.sync.time.ms", 30000)
+
   /* size of the state change request queue in Zookeeper */
   val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
 
+  /**
+   * Config options relevant to a follower for a replica
+   */
+  /** the socket timeout for network requests */
+  val socketTimeoutMs = Utils.getInt(props, "follower.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+
+  /** the socket receive buffer for network requests */
+  val socketBufferSize = Utils.getInt(props, "follower.socket.buffersize", ConsumerConfig.SocketBufferSize)
+
+  /** the number of byes of messages to attempt to fetch */
+  val fetchSize = Utils.getInt(props, "follower.fetch.size", ConsumerConfig.FetchSize)
+
+  val maxWaitTimeMs = Utils.getInt(props, "follower.fetch.wait.time.ms", 500)
+
+  val minBytes = Utils.getInt(props, "follower.fetch.min.bytes", 4086)
+
  }
Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision 1333546)
+++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import org.apache.log4j._
 import kafka.network._
 import kafka.utils._
 
@@ -42,7 +41,7 @@
 
 class KafkaRequestHandlerPool(val requestChannel: RequestChannel, 
                               val apis: KafkaApis, 
-                              numThreads: Int) { 
+                              numThreads: Int) extends Logging {
   
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
@@ -53,10 +52,12 @@
   }
   
   def shutdown() {
+    info("Shutting down request handlers")
     for(handler <- runnables)
       handler.shutdown
     for(thread <- threads)
       thread.join
+    info("Request handlers shut down")
   }
   
 }
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1333546)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -21,15 +21,15 @@
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
-import kafka.cluster.Replica
 import java.util.concurrent._
 import atomic.AtomicBoolean
+import kafka.cluster.{Partition, Replica}
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig) extends Logging {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
@@ -39,7 +39,9 @@
   var requestHandlerPool: KafkaRequestHandlerPool = null
   private var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
-  val replicaManager = new ReplicaManager(config)
+  private var replicaManager: ReplicaManager = null
+  private var leaderReplicas = new PriorityBlockingQueue[Partition](100, new PartitionHWComparator)
+  private var isrExpirationThread: ISRExpirationThread = null
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -68,9 +70,11 @@
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
+    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, becomeLeader, becomeFollower)
 
-    val apis = new KafkaApis(socketServer.requestChannel, logManager, kafkaZookeeper)
+    replicaManager = new ReplicaManager(config, time, leaderReplicas)
+
+    val apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
     socketServer.startup
 
@@ -84,6 +88,14 @@
 
     // starting relevant replicas and leader election for partitions assigned to this broker
     kafkaZookeeper.startup
+
+    // start ISR expiration thread
+    isrExpirationThread =  new ISRExpirationThread("ISRExpirationThread-" + config.brokerId,
+                                                              config, time,
+                                                              leaderReplicas,
+                                                              kafkaZookeeper.updateInSyncReplicasForPartition)
+    isrExpirationThread.start()
+
     info("Server started.")
   }
   
@@ -95,6 +107,10 @@
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("Shutting down Kafka server with id " + config.brokerId)
+      if(isrExpirationThread != null)
+        isrExpirationThread.shutdown()
+      if(replicaManager != null)
+        replicaManager.close()
       kafkaZookeeper.close
       if (socketServer != null)
         socketServer.shutdown()
@@ -117,13 +133,41 @@
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
-  def addReplica(topic: String, partition: Int): Replica = {
+  def becomeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    // stop replica fetcher thread, if any
+    replica.stopReplicaFetcherThread()
+    // read and cache the ISR
+    replicaManager.updateLeaderAndISR(replica, currentISRInZk)
+    // also add this partition to the ISR expiration priority queue
+    // TODD: Only do this for the partitions for which the leader is the current broker
+    leaderReplicas.put(replica.partition)
+  }
+
+  def becomeFollower(replica: Replica, leaderBrokerId: Int) {
+    // remove this replica's partition from the ISR expiration queue
+    // TODO: This might need some synchronization with the ISR expiration thread
+    leaderReplicas.remove(replica.partition)
+    replica.log match {
+      case Some(log) =>  // log is already started
+        log.recoverUptoLastCheckpointedHW()
+      case None =>
+    }
+    // get leader for this replica
+    // TODO: Become follower only if it is not already following the same leader
+    val leaderBroker = ZkUtils.getBrokerInfoFromIds(kafkaZookeeper.getZookeeperClient, List(leaderBrokerId)).head
+    // start fetcher thread to current leader
+    replica.startReplicaFetcherThread(leaderBroker, config)
+  }
+
+  def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
+    info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
     // get local log
     val log = logManager.getOrCreateLog(topic, partition)
-    replicaManager.addLocalReplica(topic, partition, log)
+    replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
   }
 
-  def getReplica(topic: String, partition: Int): Option[Replica] = replicaManager.getReplica(topic, partition)
+  def getReplica(topic: String, partition: Int): Option[Replica] =
+    replicaManager.getReplica(topic, partition)
 
   def getLogManager(): LogManager = logManager
 
Index: core/src/main/scala/kafka/server/RequestPurgatory.scala
===================================================================
--- core/src/main/scala/kafka/server/RequestPurgatory.scala	(revision 1333546)
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala	(working copy)
@@ -21,7 +21,6 @@
 import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import kafka.api._
 import kafka.network._
 import kafka.utils._
 
Index: core/src/main/scala/kafka/server/ISRExpirationThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ISRExpirationThread.scala	(revision 0)
+++ core/src/main/scala/kafka/server/ISRExpirationThread.scala	(revision 0)
@@ -0,0 +1,94 @@
+package kafka.server
+
+import java.util.concurrent.locks.ReentrantLock
+import java.util.{Date, Comparator}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{CountDownLatch, PriorityBlockingQueue}
+import kafka.utils.{Time, Logging}
+import kafka.cluster.Partition
+
+class ISRExpirationThread(name: String, config: KafkaConfig, time: Time,
+                          leaderReplicas: PriorityBlockingQueue[Partition],
+                          updateISRForPartition: (String, Int, Set[Int]) => Unit)
+  extends Thread(name) with Logging {
+  private val keepInSyncLock = new ReentrantLock()
+  private val hasKeepInSyncTimeExpired = keepInSyncLock.newCondition()
+  private val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+
+  override def run() {
+    try {
+      keepInSyncLock.lock()
+      info("Started ISR expiration thread on leader " + config.brokerId)
+      while(isRunning.get()) {
+        val slowestPartition = leaderReplicas.take()
+        info("Fetched partition with the earliest ISR expiration time " + slowestPartition.toString())
+        var expirationTime = new Date(slowestPartition.hwUpdateTime + config.keepInSyncTime)
+        var expirationTimeNotReached = true
+        while(expirationTime.getTime > time.milliseconds) {
+          info("Waiting until %s for partition %s's ISR expiration time".format(expirationTime.toString, slowestPartition.toString()))
+          expirationTimeNotReached = hasKeepInSyncTimeExpired.awaitUntil(expirationTime)
+          if(expirationTimeNotReached) {
+            // maybe the head of the priority queue was updated. So compute expiration deadline again
+            expirationTime = new Date(leaderReplicas.peek().hwUpdateTime + config.keepInSyncTime)
+          }
+          // else, it must've been a spurious wakeup or the deadline has expired
+        }
+        info("%d ms has elapsed. Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
+          .format(config.keepInSyncTime))
+        // deadline has elapsed. Iterate through priority queue and shrink the ISR of partitions whose HW hasn't been
+        // updated for keep.in.sync.time.ms
+        val currentTime = time.milliseconds
+
+        while((slowestPartition.hwUpdateTime + config.keepInSyncTime) <= currentTime) {
+          val topic = slowestPartition.topic
+          val partitionId = slowestPartition.partitionId
+
+          // shrink the ISR for this partition
+          val inSyncReplicas = slowestPartition.assignedReplicas().filter(r => r.leo() >= slowestPartition.leaderHW())
+
+          info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId,
+            inSyncReplicas.map(_.brokerId).mkString(",")))
+          // update in zk and in memory
+          updateISRForPartition(topic, partitionId, inSyncReplicas.map(_.brokerId).toSet)
+
+          val leaderReplica = slowestPartition.leaderReplica()
+          leaderReplica.partition.inSyncReplicas = inSyncReplicas.toSet
+          slowestPartition.leaderHW(Some(leaderReplica.hw()))
+          info("Setting leader HW for topic %s partition %d to %d".format(topic, partitionId, leaderReplica.hw()))
+          // add this partition with updated HW timestamp and HW back into the priority queue
+          leaderReplicas.offer(slowestPartition)
+        }
+      }
+    }catch {
+      case e: InterruptedException => info("ISR expiration thread %s interrupted. Shutting down".format(name))
+      case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
+    }
+    finally {
+      shutdownComplete()
+      keepInSyncLock.unlock()
+    }
+  }
+
+  private def shutdownComplete() = {
+    shutdownLatch.countDown
+  }
+
+  def shutdown() {
+    info("Shutting down ISR expiration thread")
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("ISR expiration thread shutdown completed")
+  }
+}
+
+class PartitionHWComparator extends Comparator[Partition] {
+  override def compare(partition1: Partition, partition2: Partition): Int = {
+    if(partition1.hwUpdateTime < partition2.hwUpdateTime)
+      -1
+    else if(partition1.hwUpdateTime > partition2.hwUpdateTime)
+      1
+    else 0
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 0)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 0)
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+import kafka.api.FetchRequestBuilder
+import kafka.utils.Logging
+import kafka.cluster.{Broker, Replica}
+import kafka.consumer.SimpleConsumer
+
+class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig)
+  extends Thread(name) with Logging {
+  val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+  private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port,
+    config.socketTimeoutMs, config.socketBufferSize)
+
+  override def run() {
+    try {
+      while(isRunning.get()) {
+        val builder = new FetchRequestBuilder().
+          clientId(name).
+          replicaId(replica.brokerId).
+          maxWait(config.maxWaitTimeMs).
+          minBytes(config.minBytes)
+
+        // TODO: Keep this simple single fetch for now. Change it to fancier multi fetch when message replication
+        // actually works
+        builder.addFetch(replica.topic, replica.partition.partitionId, replica.leo(), config.fetchSize)
+
+        val fetchRequest = builder.build()
+        val response = replicaConsumer.fetch(fetchRequest)
+        trace("Follower %d issued fetch request %s to leader %d".format(replica.brokerId, fetchRequest, leaderBroker.id))
+        // append messages to local log
+        replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
+        replica.hw(Some(response.data.head.partitionData.head.hw))
+        trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+          .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.hw()))
+      }
+    }catch {
+      case e: InterruptedException => info("Replica fetcher thread %s interrupted. Shutting down".format(name))
+      case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1)
+    }
+    shutdownComplete()
+  }
+
+  private def shutdownComplete() = {
+    replicaConsumer.close()
+    shutdownLatch.countDown
+  }
+
+  def shutdown() {
+    info("Shutting down replica fetcher thread")
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("Replica fetcher thread shutdown completed")
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1333546)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.io.IOException
-import java.lang.IllegalStateException
 import java.util.concurrent.atomic._
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
@@ -30,12 +29,14 @@
 import kafka.utils.{SystemTime, Logging}
 import kafka.common._
 import scala.math._
+import java.lang.IllegalStateException
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
-  
+class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
+                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
+
   private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -74,7 +75,7 @@
     }
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
-       val topicData = readMessageSets(fetchReq.fetch.offsetInfo)
+       val topicData = readMessageSets(fetchReq.fetch)
        val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
        requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }
@@ -135,11 +136,13 @@
         val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode), -1)
         requestChannel.sendResponse(channelResponse)
     }
-    
+
+    maybeAddReplicaToISR(fetchRequest)
+
     // if there are enough bytes available right now we can answer the request, otherwise we have to punt
     val availableBytes = availableFetchBytes(fetchRequest)
     if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
-      val topicData = readMessageSets(fetchRequest.offsetInfo)
+      val topicData = readMessageSets(fetchRequest)
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     } else {
@@ -172,11 +175,44 @@
     }
     totalBytes
   }
-  
+
+  def maybeAddReplicaToISR(fetchRequest: FetchRequest) {
+    val offsets = fetchRequest.offsetInfo
+    for(offsetDetail <- offsets) {
+      val topic = offsetDetail.topic
+      val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
+      for( (partition, offset) <- (partitions, offsets).zipped.map((_,_)) ) {
+        fetchRequest.replicaId match {
+          case -1 =>
+          case _ =>
+            val replicaOpt = replicaManager.getLeaderReplica(topic, partition)
+            replicaOpt match {
+              case Some(replica) =>
+                replicaManager.updateReplicaLEO(replica, offset)
+                // check if this replica needs to be added to the ISR
+                if(replicaManager.checkIfISRCanBeExpanded(replica)) {
+                  val newISR = replica.partition.inSyncReplicas.map(_.brokerId) + replica.brokerId
+                  // update ISR in ZK
+                  kafkaZookeeper.updateInSyncReplicasForPartition(topic, partition, newISR)
+                  // update ISR in memory
+                  replicaManager.addReplicaToISR(replica)
+                  info("Added replica %d to ISR for topic %s partition %d. New ISR is %s".format(replica.brokerId,
+                    replica.topic, replica.partition.partitionId, newISR.mkString(",")))
+                }
+              case None =>
+                throw new IllegalStateException("No replica %d in replica manager on %d"
+                  .format(fetchRequest.replicaId, replicaManager.config.brokerId))
+            }
+        }
+      }
+    }
+  }
+
   /**
    * Read from all the offset details given and produce an array of topic datas
    */
-  private def readMessageSets(offsets: Seq[OffsetDetail]): Array[TopicData] = {
+  private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = {
+    val offsets = fetchRequest.offsetInfo
     val fetchedData = new mutable.ArrayBuffer[TopicData]()
     for(offsetDetail <- offsets) {
       val info = new mutable.ArrayBuffer[PartitionData]()
@@ -184,8 +220,40 @@
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
         val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
-          case Left(err) => new PartitionData(partition, err, offset, MessageSet.Empty)
-          case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages)
+          case Left(err) =>
+            fetchRequest.replicaId match {
+              case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+              case _ =>
+                val replicaOpt = replicaManager.getLeaderReplica(topic, partition)
+                replicaOpt match {
+                  case Some(replica) =>
+                    new PartitionData(partition, err, offset, replica.hw(), MessageSet.Empty)
+                  case None =>
+                    new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+                }
+            }
+          case Right(messages) =>
+            fetchRequest.replicaId match {
+              case -1 => new PartitionData(partition, ErrorMapping.NoError, offset, -1L, messages)
+              case _ => // fetch request from a follower
+                val replicaOpt = replicaManager.getLeaderReplica(topic, partition)
+                replicaOpt match {
+                  case Some(replica) =>
+                    replicaManager.updateReplicaLEO(replica, offset)
+                    // check if this replica needs to be added to the ISR
+                    if(replicaManager.checkIfISRCanBeExpanded(replica)) {
+                      val newISR = replica.partition.inSyncReplicas.map(_.brokerId) + replica.brokerId
+                      // update ISR in ZK
+                      kafkaZookeeper.updateInSyncReplicasForPartition(topic, partition, newISR)
+                      // update ISR in memory
+                      replicaManager.addReplicaToISR(replica)
+                    }
+                    new PartitionData(partition, ErrorMapping.NoError, offset, replica.hw(), messages)
+                  case None =>
+                    throw new IllegalStateException("No replica %d in replica manager on %d"
+                      .format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                }
+            }
         }
         info.append(partitionInfo)
       }
@@ -286,7 +354,7 @@
      * When a request expires just answer it with whatever data is present
      */
     def expire(delayed: DelayedFetch) {
-      val topicData = readMessageSets(delayed.fetch.offsetInfo)
+      val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1333546)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -18,47 +18,150 @@
 
 import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
-import kafka.utils.Logging
 import collection.mutable
+import kafka.common.NoLeaderForPartitionException
+import java.util.concurrent.PriorityBlockingQueue
+import java.lang.IllegalStateException
+import kafka.utils.{Time, Logging}
 
-class ReplicaManager(config: KafkaConfig) extends Logging {
+class ReplicaManager(val config: KafkaConfig, time: Time,
+                     leaderReplicas: PriorityBlockingQueue[Partition]) extends Logging {
 
-  private var replicas: mutable.Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
+  private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
 
-  def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
-        r.log match {
+  def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
+    val partition = makeSurePartitionExists(topic, partitionId, assignedReplicaIds)
+    val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+
+    val replicaOpt = partition.getReplica(config.brokerId)
+    replicaOpt match {
+      case Some(replica) =>
+        info("Changing remote replica %s into a local replica".format(replica.toString))
+        replica.log match {
           case None =>
-            r.log = Some(log)
-          case Some(l) => // nothing to do since log already exists
+            replica.log = Some(log)
+          case Some(log) => // nothing to do since log already exists
         }
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
-        replicas += (topic, partitionId) -> replica
-        info("Added local replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        partition.addReplica(localReplica)
     }
-    replicas.get((topic, partitionId)).get
+    val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
+    partition.assignedReplicas(Some(assignedReplicas))
+    // get the replica objects for the assigned replicas for this partition
+    info("Added local replica for topic %s partition %s on broker %d"
+      .format(localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+    localReplica
   }
 
-  def addRemoteReplica(topic: String, partitionId: Int): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
+  def makeSurePartitionExists(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
+    val newPartition = allReplicas.contains((topic, partitionId))
+    newPartition match {
+      case true => // partition exists, do nothing
+        allReplicas.get((topic, partitionId)).get
+      case false => // create remote replicas for each replica id in assignedReplicas
+        val partition = new Partition(topic, partitionId, time)
+        allReplicas += (topic, partitionId) -> partition
+        (assignedReplicaIds - config.brokerId).foreach(
+          replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
+        partition
+    }
+  }
+
+  def assurePartitionExists(topic: String, partitionId: Int): Partition = {
+    val partitionOpt = allReplicas.get((topic, partitionId))
+    partitionOpt match {
+      case Some(partition) => partition
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
-        replicas += (topic, partitionId) -> replica
-        info("Added remote replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        throw new IllegalStateException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
+        .format(topic, partitionId, config.brokerId))
     }
-    replicas.get((topic, partitionId)).get
   }
 
-  def getReplica(topic: String, partitionId: Int): Option[Replica] = {
-    replicas.get((topic, partitionId))
+  def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
+    val remoteReplica = new Replica(replicaId, partition, topic)
+
+    val replicaAdded = partition.addReplica(remoteReplica)
+    if(replicaAdded)
+      info("Added remote replica %d for topic %s partition %s on broker %d"
+        .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
+    remoteReplica
   }
-}
\ No newline at end of file
+
+  def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        replicas.getReplica(replicaId)
+      case None =>
+        None
+    }
+  }
+
+  def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        Some(replicas.leaderReplica())
+      case None =>
+        throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " +
+      "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+    }
+  }
+
+  def getPartition(topic: String, partitionId: Int): Option[Partition] =
+    allReplicas.get((topic, partitionId))
+
+  def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+    // set the replica leo
+    val partition = assurePartitionExists(replica.topic, replica.partition.partitionId)
+    partition.updateReplicaLEO(replica, fetchOffset)
+  }
+
+  def updateLeaderAndISR(newLeaderReplica: Replica, isrInZk: Seq[Int]) = {
+    val partition = assurePartitionExists(newLeaderReplica.topic, newLeaderReplica.partition.partitionId)
+    // update partition's leader
+    partition.leaderId(Some(newLeaderReplica.brokerId))
+    // update partition's ISR
+    val inSyncReplicas = isrInZk.map { rid =>
+      partition.getReplica(rid) match {
+        case Some(replica) => replica
+        case None => throw new IllegalStateException("In sync replica %d for topic %s ".format(rid, newLeaderReplica.topic) +
+          "partition %d doesn't exist in replica manager".format(newLeaderReplica.partition.partitionId))
+      }
+    }
+    partition.inSyncReplicas = inSyncReplicas.toSet
+  }
+
+  def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
+    val partitionOpt = allReplicas.get((replica.topic, replica.partition.partitionId))
+    partitionOpt match {
+      case Some(partition) =>
+        if(partition.inSyncReplicas.contains(replica)) false
+        else if(partition.assignedReplicas().contains(replica)) {
+          val leaderHW = partition.leaderHW()
+          replica.leo() >= leaderHW
+        }
+        else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+          " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+      case None => // throw exception
+        throw new NoLeaderForPartitionException("Check for ISR expansion failed on %d. Leader replica for ".format(config.brokerId) +
+          "topic %s partition %d doesn't exist in the replica manager".format(replica.topic, replica.partition.partitionId))
+    }
+  }
+
+  def addReplicaToISR(replica: Replica): Set[Replica] = {
+    val partitionOpt = getPartition(replica.topic, replica.partition.partitionId)
+    partitionOpt match {
+      case Some(partition) =>
+        partition.inSyncReplicas += replica
+        partition.inSyncReplicas
+      case None => // throw exception
+        throw new NoLeaderForPartitionException("Check for ISR expansion failed on %d. Leader replica for ".format(config.brokerId) +
+          "topic %s partition %d doesn't exist in the replica manager".format(replica.topic, replica.partition.partitionId))
+    }
+  }
+
+  def close() {
+    allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
+  }
+}
Index: core/src/main/scala/kafka/api/FetchResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchResponse.scala	(revision 1333546)
+++ core/src/main/scala/kafka/api/FetchResponse.scala	(working copy)
@@ -29,18 +29,20 @@
     val partition = buffer.getInt
     val error = buffer.getInt
     val initialOffset = buffer.getLong
+    val hw = buffer.getLong()
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
   }
 }
 
-case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
-  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
+case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L,
+                         messages: MessageSet) {
+  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8
 
-  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
+  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
 
   def translatePartition(topic: String, randomSelector: String => Int): Int = {
     if (partition == ProducerRequest.RandomPartition)
@@ -123,6 +125,14 @@
     }
     messageSet.asInstanceOf[ByteBufferMessageSet]
   }
+
+  def highWatermark(topic: String, partition: Int): Long = {
+    topicMap.get(topic) match {
+      case Some(topicData) =>
+        TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
+      case None => -1L
+    }
+  }
 }
 
 // SENDS
@@ -131,10 +141,11 @@
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(20)
+  private val buffer = ByteBuffer.allocate(28)
   buffer.putInt(partitionData.partition)
   buffer.putInt(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
+  buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes.intValue())
   buffer.rewind()
 
