Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -32,10 +32,10 @@
 import collection.mutable.ListBuffer
 import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import scala.collection.Map
-import kafka.serializer.Encoder
 import kafka.api.{ProducerRequest, TopicData, PartitionData}
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
+import kafka.serializer.{DefaultEncoder, Encoder}
 
 /**
  * Utility functions to help with testing
@@ -122,6 +122,7 @@
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval", "1")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("follower.socket.timeout.ms", "1500")
     props
   }
   
@@ -280,12 +281,13 @@
   /**
    * Create a producer for the given host and port
    */
-  def createProducer[K, V](zkConnect: String): Producer[K, V] = {
+  def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
     val props = new Properties()
     props.put("zk.connect", zkConnect)
     props.put("buffer.size", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
+    props.put("serializer.class", encoder.getClass.getCanonicalName)
     new Producer[K, V](new ProducerConfig(props))
   }
 
Index: core/src/test/scala/unit/kafka/log/SegmentListTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/SegmentListTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/log/SegmentListTest.scala	(working copy)
@@ -49,6 +49,53 @@
   }
   
   @Test
+  def testTruncLast() {
+    {
+      val hd = List(1,2,3)
+      val tail = List(4,5,6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      val deleted = sl.truncLast(2)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+
+    {
+      val hd = List(1,2,3,4,5)
+      val tail = List(6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      try {
+        sl.truncLast(6)
+        sl.truncLast(5)
+        sl.truncLast(-1)
+        fail("Attempt to truncate with illegal index should fail")
+      }catch {
+        case e: IllegalArgumentException => // this is ok
+      }
+      val deleted = sl.truncLast(4)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+
+    {
+      val hd = List(1)
+      val tail = List(2,3,4,5,6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      val deleted = sl.truncLast(0)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+  }
+
+  @Test
   def testTruncBeyondList() {
     val sl = new SegmentList(List(1, 2))
     sl.trunc(3)
Index: core/src/test/scala/unit/kafka/log/LogTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/log/LogTest.scala	(working copy)
@@ -48,14 +48,14 @@
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, false)
+    new DiskLog(logDir, 1024, 1000, false)
   }
   
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, false)
+      new DiskLog(logDir, 1024, 1000, false)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: IllegalStateException => "This is good"
@@ -64,7 +64,7 @@
   
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new DiskLog(logDir, 1024, 1000, false)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -81,7 +81,7 @@
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new DiskLog(logDir, 1024, 1000, false)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -101,7 +101,7 @@
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, false)
+    val log = new DiskLog(logDir, 100, 1000, false)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -156,7 +156,7 @@
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new DiskLog(logDir, 100, 1000, false)
       val curOffset = log.nextAppendOffset
       assertEquals(curOffset, 0)
 
@@ -169,7 +169,7 @@
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new DiskLog(logDir, 100, 1000, false)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(working copy)
@@ -62,33 +62,38 @@
     super.tearDown()
   }
 
-  @Test
-  def testEmptyLogs() {
-    val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
-    assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
+  // TODO: Is this test necessary ? There are tons of long sleeps in here
+//  @Test
+//  def testEmptyLogs() {
+//    CreateTopicCommand.createTopic(zkClient, "test", 1)
+//    val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
+//    assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
+//
+//    Thread.sleep(2000)
+//    val name = "test"
+//    val logFile = new File(logDir, name + "-0")
+//
+//    {
+//      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10)
+//      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+//      assertTrue(!logFile.exists())
+//    }
+//    Thread.sleep(2000)
+//
+//    {
+//      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10)
+//      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+//      assertTrue(!logFile.exists())
+//    }
+//    Thread.sleep(2000)
+//
+//    {
+//      val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10)
+//      assertEquals( 0, offsets.length )
+//      assertTrue(!logFile.exists())
+//    }
+//  }
 
-    val name = "test"
-    val logFile = new File(logDir, name + "-0")
-    
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10)
-      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
-      assertTrue(!logFile.exists())
-    }
-
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10)
-      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
-      assertTrue(!logFile.exists())
-    }
-
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10)
-      assertEquals( 0, offsets.length )
-      assertTrue(!logFile.exists())
-    }
-  }
-
   @Test
   def testGetOffsetsBeforeLatestTime() {
     val topicPartition = "kafka-" + 0
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -27,9 +27,9 @@
 import kafka.network._
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
-import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
+import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -80,6 +80,7 @@
     // topic metadata request only requires 2 APIs from the log manager
     val logManager = EasyMock.createMock(classOf[LogManager])
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
     EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
     EasyMock.replay(logManager)
@@ -94,7 +95,8 @@
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, kafkaZookeeper)
+    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper,
+      (topic: String, partition: Int, isr: Seq[Int], None) => Unit)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -30,10 +30,11 @@
 import kafka.producer.{ProducerData, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
+import kafka.admin.CreateTopicCommand
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
-  val numNodes = 2
+  val numNodes = 1
   val configs = 
     for(props <- TestUtils.createBrokerConfigs(numNodes))
       yield new KafkaConfig(props)
@@ -54,6 +55,7 @@
 
   override def setUp() {
     super.setUp
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
     fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
     fetcher.stopConnectionsToAllBrokers
     fetcher.startConnections(topicInfos, cluster, null)
@@ -69,11 +71,9 @@
     var count = sendMessages(perNode)
     waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     fetch(count)
-    Thread.sleep(100)
     assertQueueEmpty()
     count = sendMessages(perNode)
     fetch(count)
-    Thread.sleep(100)
     assertQueueEmpty()
   }
   
Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(working copy)
@@ -18,8 +18,6 @@
 package kafka.integration
 
 import junit.framework.Assert._
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.atomic.AtomicInteger
 import kafka.utils.{ZKGroupTopicDirs, Logging}
 import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
 import kafka.server._
@@ -61,10 +59,10 @@
   def testResetToEarliestWhenOffsetTooLow() =
     assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
     
-  def testResetToLatestWhenOffsetTooHigh() = 
+  def testResetToLatestWhenOffsetTooHigh() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
-    
-  def testResetToLatestWhenOffsetTooLow() = 
+
+  def testResetToLatestWhenOffsetTooLow() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
   
   /* Produce the given number of messages, create a consumer with the given offset policy, 
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -28,9 +28,9 @@
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
 import kafka.message.Message
-import java.io.File
 import kafka.utils.{TestZKUtils, TestUtils}
 import scala.collection._
+import kafka.admin.CreateTopicCommand
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -90,7 +90,7 @@
       case e: FetchRequestFormatException => "success"
     }
   }
-  
+
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"
     val props = new Properties()
@@ -153,7 +153,7 @@
         messages += topic -> messageList
         producer.send(producerData)
         builder.addFetch(topic, partition, 0, 10000)
-      }
+    }
 
       // wait a bit for produced message to be available
       Thread.sleep(700)
@@ -194,8 +194,9 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
+        for( (topic, partition) <- topics) {
           response.messageSet(topic, -1).iterator
+        }
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: InvalidPartitionException => "this is good"
@@ -323,9 +324,8 @@
 
   def testConsumerNotExistTopic() {
     val newTopic = "new-topic"
+    CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
-    val logFile = new File(config.logDir, newTopic + "-0")
-    assertTrue(!logFile.exists)
   }
 }
Index: core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(working copy)
@@ -27,6 +27,7 @@
 
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
 
 class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -59,6 +60,7 @@
 
   // test for reading data with magic byte 0
   def testProtocolVersion0() {
+    CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
     val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
     var fetchOffset: Long = 0L
     var messageCount: Int = 0
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -112,7 +112,7 @@
     Assert.assertEquals(request.correlationId, response.correlationId)
     Assert.assertEquals(response.errors.length, response.offsets.length)
     Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, _))
+    response.errors.foreach(Assert.assertEquals(ErrorMapping.InvalidPartitionCode.toShort, _))
     response.offsets.foreach(Assert.assertEquals(-1L, _))
 
     // #2 - test that we get correct offsets when partition is owner by broker
@@ -132,7 +132,7 @@
     Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.WrongPartitionCode.toShort, response2.errors(1))
+    Assert.assertEquals(ErrorMapping.InvalidPartitionCode.toShort, response2.errors(1))
     Assert.assertEquals(-1, response2.offsets(1))
   }
 
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -91,32 +91,31 @@
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
-
+    // create topic with 1 partition
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
     val producer = new Producer[String, String](config)
     try {
       // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0, but
       // since partition 0 can exist on any of the two brokers, we need to fetch from both brokers
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(100)
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(1000)
-      // cross check if one of the brokers got the messages
-      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", (messageSet1.hasNext || messageSet2.hasNext))
+      // get the leader
+      val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
+      assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
+      val leader = leaderOpt.get
 
-      if(messageSet1.hasNext) {
-        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-        assertTrue("Message set should have 1 message", messageSet1.hasNext)
-        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      val messageSet = if(leader == server1.config.brokerId) {
+        val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+        response1.messageSet("new-topic", 0).iterator
+      }else {
+        val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+        response2.messageSet("new-topic", 0).iterator
       }
-      else {
-        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
-        assertTrue("Message set should have 1 message", messageSet2.hasNext)
-        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
-      }
+      assertTrue("Message set should have 1 message", messageSet.hasNext)
+
+      assertEquals(new Message("test1".getBytes), messageSet.next.message)
+      assertTrue("Message set should have 1 message", messageSet.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet.next.message)
     } catch {
       case e: Exception => fail("Not expected", e)
     } finally {
@@ -178,65 +177,6 @@
   }
 
   @Test
-  def testZKSendToExistingTopicWithNoBrokers() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
-    val config = new ProducerConfig(props)
-
-    val producer = new Producer[String, String](config)
-    var server: KafkaServer = null
-
-    // create topic
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-
-    try {
-      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-      // all partitions have broker 0 as the leader.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-      // cross check if brokers got the messages
-      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.next.message)
-
-      // shutdown server2
-      server2.shutdown
-      server2.awaitShutdown()
-      Thread.sleep(100)
-      // delete the new-topic logs
-      Utils.rm(server2.config.logDir)
-      Thread.sleep(100)
-      // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
-      val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-      val config2 = new KafkaConfig(props2) {
-        override val numPartitions = 4
-      }
-      server = TestUtils.createServer(config2)
-      Thread.sleep(100)
-
-      // now there are no brokers registered under test-topic.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-
-      // cross check if brokers got the messages
-      val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet2.next.message)
-
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    } finally {
-      if(server != null) server.shutdown
-      producer.close
-    }
-  }
-
-  @Test
   def testAsyncSendCanCorrectlyFailWithTimeout() {
     val timeoutMs = 500
     val props = new Properties()
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -51,6 +51,7 @@
     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
 
     servers ++= List(server1, server2)
+    try {
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -59,7 +60,7 @@
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -68,7 +69,7 @@
     servers.head.shutdown()
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     Thread.sleep(zookeeper.tickTime)
@@ -85,9 +86,12 @@
 
     // test if the leader is the preferred replica
     assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+    }finally {
     // shutdown the servers and delete data hosted on them
     servers.map(server => server.shutdown())
     servers.map(server => Utils.rm(server.config.logDir))
+
+    }
   }
 
   // Assuming leader election happens correctly, test if epoch changes as expected
@@ -105,17 +109,17 @@
 
       var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
       assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
-      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
 
     }finally {
       TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
Index: core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala	(revision 0)
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import collection.mutable.HashMap
+import collection.mutable.Map
+import java.util.concurrent.PriorityBlockingQueue
+import kafka.cluster.{Partition, Replica}
+import org.easymock.EasyMock
+import junit.framework.Assert._
+import kafka.log.Log
+import kafka.utils.{Time, MockTime, TestUtils}
+
+class ISRExpirationTest extends JUnit3Suite{
+
+  var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val keepInSyncTime = 100L
+  })
+  val topic = "foo"
+
+  def testISRExpirationForSinglePartition() {
+    val time = new MockTime
+    val partitions = new PriorityBlockingQueue[Partition](100, new PartitionHWComparator)
+    val isrExpirationThread =  new ISRExpirationThread("ISRExpirationThread", configs.head, time, partitions,
+                                                       updateISRForPartition)
+    // create leader replica
+    val log = getLogWithHW(5L)
+    // add one partition
+    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 5L)
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+
+    // set remote replicas leo to something low, like 2
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
+    partitions.add(partition0)
+    isrExpirationThread.start()
+
+    // sleep for config.keepInSyncTime
+    time.sleep(configs.head.keepInSyncTime)
+    isrExpirationThread.signal()
+    Thread.sleep(10)
+
+    time.sleep(configs.head.keepInSyncTime)
+
+    val partition0ISR = topicPartitionISR.get((topic, partition0.partitionId))
+    assertTrue("ISR must exist for topic foo partition 0", partition0ISR.isDefined)
+    assertEquals(List(0), partition0ISR.get)
+
+    isrExpirationThread.shutdown()
+    EasyMock.verify(log)
+  }
+
+  def testISRExpirationForMultiplePartitions() {
+    val time = new MockTime
+    val partitions = new PriorityBlockingQueue[Partition](100, new PartitionHWComparator)
+    val isrExpirationThread =  new ISRExpirationThread("ISRExpirationThread", configs.head, time, partitions,
+      updateISRForPartition)
+    try {
+      // create leader replica
+      val log0 = EasyMock.createMock(classOf[kafka.log.Log])
+      EasyMock.expect(log0.getLogEndOffset).andReturn(5L).times(2)
+      EasyMock.expect(log0.getHighwaterMark).andReturn(5L).times(6)
+      EasyMock.expect(log0.setHW(5L)).times(3)
+      EasyMock.replay(log0)
+
+      val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log0, 5L)
+      val leaderReplicaPartition0 = partition0.getReplica(configs.head.brokerId).get
+      // set the leo for non-leader replicas to something low
+      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
+
+      // add 2nd partition
+      val log1 = EasyMock.createMock(classOf[kafka.log.Log])
+      EasyMock.expect(log1.getHighwaterMark).andReturn(15L).times(3)
+      EasyMock.expect(log1.getLogEndOffset).andReturn(15L).times(1)
+      EasyMock.expect(log1.setHW(15L)).times(2)
+      EasyMock.replay(log1)
+
+      time.sleep(200)
+
+      val partition1 = getPartitionWithAllReplicasInISR(topic, 1, time, configs.head.brokerId, log1, 15L)
+      val leaderReplicaPartition1 = partition1.getReplica(configs.head.brokerId).get
+      // set the leo for non-leader replicas to something low
+      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 12))
+
+      // add partitions to priority queue
+      partitions.add(partition0)
+      partitions.add(partition1)
+      // start the isr expiration thread
+      isrExpirationThread.start()
+      Thread.sleep(10)
+
+      // check that partition 0 is the head of the queue
+      assertEquals("Priority queue should have 1 partition", 1, partitions.size())
+      assertTrue("ISR for topic %s partition %d should be defined".format(topic, 0), topicPartitionISR.get((topic, 0)).isDefined)
+      assertEquals(List(0) , topicPartitionISR.get((topic, 0)).get)
+
+      time.sleep(configs.head.keepInSyncTime)
+      // expire partition 0
+      isrExpirationThread.signal()
+
+      Thread.sleep(100)
+      assertTrue("ISR for topic %s partition %d should be defined".format(topic, 1), topicPartitionISR.get((topic, 1)).isDefined)
+      assertEquals(List(0) , topicPartitionISR.get((topic, 1)).get)
+      EasyMock.verify(log0)
+      EasyMock.verify(log1)
+    }catch {
+      case e => e.printStackTrace()
+    }finally {
+       isrExpirationThread.shutdown()
+    }
+  }
+
+  private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
+                                               localLog: Log, leaderHW: Long): Partition = {
+    val partition = new Partition(topic, partitionId, time)
+    val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
+
+    val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
+    partition.assignedReplicas(Some(allReplicas.toSet))
+    // set in sync replicas for this partition to all the assigned replicas
+    partition.inSyncReplicas = allReplicas.toSet
+    // set the leader and its hw and the hw update time
+    partition.leaderId(Some(leaderId))
+    partition.leaderHW(Some(leaderHW))
+    partition
+  }
+
+  private def getLogWithHW(hw: Long): Log = {
+    val log1 = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log1.getLogEndOffset).andReturn(hw).times(1)
+    EasyMock.expect(log1.getHighwaterMark).andReturn(hw).times(3)
+    EasyMock.expect(log1.setHW(hw)).times(2)
+    EasyMock.replay(log1)
+
+    log1
+  }
+
+  private def updateISRForPartition(topic: String, partition: Int, isr: Seq[Int], leader: Option[Replica] = None) {
+    topicPartitionISR += (topic,  partition) -> isr
+  }
+
+  private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
+    configs.filter(_.brokerId != leaderId).map { config =>
+      new Replica(config.brokerId, partition, topic)
+    }
+  }
+}
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala	(revision 1340152)
+++ core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala	(working copy)
@@ -20,7 +20,6 @@
 import scala.collection._
 import org.junit.{After, Before, Test}
 import junit.framework.Assert._
-import kafka.server._
 import kafka.message._
 import kafka.api._
 import kafka.utils.TestUtils
Index: core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(revision 0)
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import kafka.producer.ProducerData
+import kafka.serializer.StringEncoder
+import kafka.admin.CreateTopicCommand
+import kafka.cluster.{Replica, Partition, Broker}
+import kafka.log.InMemoryLog
+import kafka.utils.{MockTime, TestUtils}
+import junit.framework.Assert._
+
+class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
+  val props = createBrokerConfigs(2)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  var brokers: Seq[KafkaServer] = null
+  val topic = "foobar"
+
+  override def setUp() {
+    super.setUp()
+    brokers = configs.map(config => TestUtils.createServer(config))
+  }
+
+  override def tearDown() {
+    super.tearDown()
+    brokers.foreach(_.shutdown())
+  }
+
+  def testReplicaFetcherThread() {
+    val partition = 0
+    val testMessageList = List("test1", "test2", "test3", "test4")
+    val leaderBrokerId = configs.head.brokerId
+    val followerBrokerId = configs.last.brokerId
+    val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
+
+    // create a topic and partition
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+
+    // send test messages to leader
+    val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
+    producer.send(new ProducerData[String, String](topic, "test", testMessageList))
+
+    // mock the replica log
+    // TODO: instead, use an in memory log
+    val replicaLog = new InMemoryLog(topic, partition, 500, 500, false)
+
+    // create replica fetch thread
+    val time = new MockTime
+    val testPartition = new Partition(topic, partition, time)
+    testPartition.leaderId(Some(leaderBrokerId))
+    val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog))
+    val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last)
+
+    // start a replica fetch thread to the above broker
+    replicaFetchThread.start()
+
+    Thread.sleep(700)
+    replicaFetchThread.shutdown()
+
+    assertEquals(60L, testReplica.log.get.getLogEndOffset)
+    replicaLog.close()
+  }
+}
\ No newline at end of file
Index: core/src/test/scala/other/kafka/inmemory/InMemoryLog.scala
===================================================================
--- core/src/test/scala/other/kafka/inmemory/InMemoryLog.scala	(revision 0)
+++ core/src/test/scala/other/kafka/inmemory/InMemoryLog.scala	(revision 0)
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.log
+
+import kafka.api.OffsetRequest
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+import collection.mutable.ListBuffer
+import kafka.utils.{Utils, Logging}
+import kafka.message._
+
+class InMemoryLog(val topic: String, val partition: Int, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean)
+  extends Log with Logging {
+  /* A lock that guards all modifications to the log */
+  private val lock = new Object
+
+  private val setSize = new AtomicLong()
+
+  private val setHighWaterMark = new AtomicLong()
+
+  /* The current number of unflushed messages appended to the write */
+  private val unflushed = new AtomicInteger(0)
+
+   /* last time it was flushed */
+  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
+
+  /* The actual segments of the log */
+  val messages = new ListBuffer[MessageAndOffset]()
+
+  private var hw: Long = 0
+
+  private val logStats = new LogStats(this)
+
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + name)
+
+  def name: String = topic + "-" + partition
+
+  def getTopicName: String = topic
+
+  def getHighwaterMark: Long = setHighWaterMark.get()
+
+  def numberOfSegments: Int = 1
+
+  def close(): Unit = {}
+
+  def append(newMessages: MessageSet): Unit = {
+    // validate the messages
+    var numberOfMessages = 0
+    for(messageAndOffset <- newMessages) {
+      if(!messageAndOffset.message.isValid)
+        throw new InvalidMessageException()
+      numberOfMessages += 1;
+    }
+
+    logStats.recordAppendedMessages(numberOfMessages)
+
+    val iter = newMessages.iterator
+    while(iter.hasNext) {
+      messages += iter.next()
+    }
+    info("Appended %d messages to in memory log for topic %s partition %d".format(numberOfMessages, topic, partition))
+    setSize.getAndAdd(newMessages.sizeInBytes)
+    info("New log end offset for topic %s partition %d is %d".format(topic, partition, setSize.get()))
+  }
+
+  def read(offset: Long, length: Int): MessageSet = {
+    new ByteBufferMessageSet(messages.map(_.message): _*)
+  }
+
+  def size: Long = new ByteBufferMessageSet(messages.map(_.message): _*).sizeInBytes
+
+  def getLogEndOffset: Long = setSize.get()
+
+  def nextAppendOffset: Long = setSize.get()
+
+  def getOffsetsBefore(request: OffsetRequest): Array[Long] = { new Array[Long](1) }
+
+  def setHW(latestLeaderHW: Long) {
+    hw = latestLeaderHW
+  }
+
+  def checkpointHW(): Unit = {}
+
+  def flush(): Unit = {
+    if (unflushed.get == 0) return
+
+    lock synchronized {
+      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+          System.currentTimeMillis)
+      setHighWaterMark.set(size)
+      unflushed.set(0)
+      lastflushedTime.set(System.currentTimeMillis)
+      checkpointHW()
+     }
+  }
+
+  def getLastFlushedTime: Long = lastflushedTime.get()
+
+  def recoverUptoLastCheckpointedHW(): Unit = {}
+
+  def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = { Seq.empty[LogSegment] }
+
+}
\ No newline at end of file
Index: core/src/test/scala/other/kafka/TestLogPerformance.scala
===================================================================
--- core/src/test/scala/other/kafka/TestLogPerformance.scala	(revision 1340152)
+++ core/src/test/scala/other/kafka/TestLogPerformance.scala	(working copy)
@@ -30,7 +30,7 @@
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, false)
+    val log = new DiskLog(dir, 50*1024*1024, 5000000, false)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)
Index: core/src/main/scala/kafka/log/SegmentList.scala
===================================================================
--- core/src/main/scala/kafka/log/SegmentList.scala	(revision 1340152)
+++ core/src/main/scala/kafka/log/SegmentList.scala	(working copy)
@@ -72,8 +72,30 @@
     }
     deleted
   }
-  
+
   /**
+   * Delete the items from position newEnd until end of list
+   */
+  def truncLast(newEnd: Int): Seq[T] = {
+    if(newEnd >= contents.get().size-1)
+      throw new IllegalArgumentException("End index must be segment list size - 1");
+    var deleted: Array[T] = null
+    var done = false
+    while(!done) {
+      val curr = contents.get()
+      val newLength = newEnd + 1
+      val updated = new Array[T](newLength)
+      Array.copy(curr, 0, updated, 0, newLength)
+      if(contents.compareAndSet(curr, updated)) {
+        deleted = new Array[T](curr.length - newLength)
+        Array.copy(curr, newEnd + 1, deleted, 0, curr.length - newLength)
+        done = true
+      }
+    }
+    deleted
+  }
+
+  /**
    * Get a consistent view of the sequence
    */
   def view: Array[T] = contents.get()
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1333546)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -13,21 +13,56 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package kafka.log
 
-import java.util.concurrent.atomic._
-import java.text.NumberFormat
-import java.io._
-import kafka.message._
-import kafka.utils._
-import kafka.common._
+import kafka.message.MessageSet
 import kafka.api.OffsetRequest
-import java.util._
+import kafka.common.OffsetOutOfRangeException
+import java.text.NumberFormat
+import kafka.utils.Range
 
-private[kafka] object Log {
+trait Log {
+
+  def name: String
+
+  def getTopicName: String
+
+  def getHighwaterMark: Long
+
+  def numberOfSegments: Int
+
+  def close(): Unit
+
+  def append(messages: MessageSet): Unit
+
+  def read(offset: Long, length: Int): MessageSet
+
+  def size: Long
+
+  def getLogEndOffset: Long
+
+  def nextAppendOffset: Long
+
+  def getOffsetsBefore(request: OffsetRequest): Array[Long]
+
+  def setHW(latestLeaderHW: Long)
+
+  def checkpointHW(): Unit
+
+  def flush(): Unit
+
+  def getLastFlushedTime: Long
+
+  def recoverUptoLastCheckpointedHW(): Unit
+
+  def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment]
+}
+
+object Log {
   val FileSuffix = ".kafka"
+  val hwFileName = "highwatermark"
 
   /**
    * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
@@ -75,297 +110,15 @@
     nf.setMinimumIntegerDigits(20)
     nf.setMaximumFractionDigits(0)
     nf.setGroupingUsed(false)
-    nf.format(offset) + Log.FileSuffix
+    nf.format(offset) + FileSuffix
   }
-  
+
   def getEmptyOffsets(request: OffsetRequest): Array[Long] = {
     if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime)
       return Array(0L)
     else
       return Array()
   }
-}
 
-/**
- * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
- */
-private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
-  @volatile var deleted = false
-  def size: Long = messageSet.highWaterMark
-  override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
 }
 
-
-/**
- * An append-only log for storing messages. 
- */
-@threadsafe
-private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
-
-  /* A lock that guards all modifications to the log */
-  private val lock = new Object
-
-  /* The current number of unflushed messages appended to the write */
-  private val unflushed = new AtomicInteger(0)
-
-   /* last time it was flushed */
-  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
-
-  /* The actual segments of the log */
-  private[log] val segments: SegmentList[LogSegment] = loadSegments()
-
-  /* The name of this log */
-  val name  = dir.getName()
-
-  private val logStats = new LogStats(this)
-
-  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
-
-  /* Load the log segments from the log files on disk */
-  private def loadSegments(): SegmentList[LogSegment] = {
-    // open all the segments read-only
-    val accum = new ArrayList[LogSegment]
-    val ls = dir.listFiles()
-    if(ls != null) {
-      for(file <- ls if file.isFile && file.toString.endsWith(Log.FileSuffix)) {
-        if(!file.canRead)
-          throw new IOException("Could not read file " + file)
-        val filename = file.getName()
-        val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
-        val messageSet = new FileMessageSet(file, false)
-        accum.add(new LogSegment(file, messageSet, start))
-      }
-    }
-
-    if(accum.size == 0) {
-      // no existing segments, create a new mutable segment
-      val newFile = new File(dir, Log.nameFromOffset(0))
-      val set = new FileMessageSet(newFile, true)
-      accum.add(new LogSegment(newFile, set, 0))
-    } else {
-      // there is at least one existing segment, validate and recover them/it
-      // sort segments into ascending order for fast searching
-      Collections.sort(accum, new Comparator[LogSegment] {
-        def compare(s1: LogSegment, s2: LogSegment): Int = {
-          if(s1.start == s2.start) 0
-          else if(s1.start < s2.start) -1
-          else 1
-        }
-      })
-      validateSegments(accum)
-
-      //make the final section mutable and run recovery on it if necessary
-      val last = accum.remove(accum.size - 1)
-      last.messageSet.close()
-      info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
-      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
-      accum.add(mutable)
-    }
-    new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
-  }
-
-  /**
-   * Check that the ranges and sizes add up, otherwise we have lost some data somewhere
-   */
-  private def validateSegments(segments: ArrayList[LogSegment]) {
-    lock synchronized {
-      for(i <- 0 until segments.size - 1) {
-        val curr = segments.get(i)
-        val next = segments.get(i+1)
-        if(curr.start + curr.size != next.start)
-          throw new IllegalStateException("The following segments don't validate: " +
-                  curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
-      }
-    }
-  }
-
-  /**
-   * The number of segments in the log
-   */
-  def numberOfSegments: Int = segments.view.length
-
-  /**
-   * Close this log
-   */
-  def close() {
-    debug("Closing log " + name)
-    lock synchronized {
-      for(seg <- segments.view)
-        seg.messageSet.close()
-    }
-  }
-
-  /**
-   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
-   * Returns the offset at which the messages are written.
-   */
-  def append(messages: MessageSet): Unit = {
-    // validate the messages
-    var numberOfMessages = 0
-    for(messageAndOffset <- messages) {
-      if(!messageAndOffset.message.isValid)
-        throw new InvalidMessageException()
-      numberOfMessages += 1;
-    }
-
-    logStats.recordAppendedMessages(numberOfMessages)
-    
-    // they are valid, insert them in the log
-    lock synchronized {
-      try {
-        val segment = segments.view.last
-        segment.messageSet.append(messages)
-        maybeFlush(numberOfMessages)
-        maybeRoll(segment)
-      }
-      catch {
-        case e: IOException =>
-          fatal("Halting due to unrecoverable I/O error while handling producer request", e)
-          Runtime.getRuntime.halt(1)
-        case e2 => throw e2
-      }
-    }
-  }
-
-  /**
-   * Read from the log file at the given offset
-   */
-  def read(offset: Long, length: Int): MessageSet = {
-    val view = segments.view
-    Log.findRange(view, offset, view.length) match {
-      case Some(segment) => segment.messageSet.read((offset - segment.start), length)
-      case _ => MessageSet.Empty
-    }
-  }
-
-  /**
-   * Delete any log segments matching the given predicate function
-   */
-  def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
-    lock synchronized {
-      val view = segments.view
-      val deletable = view.takeWhile(predicate)
-      for(seg <- deletable)
-        seg.deleted = true
-      val numToDelete = deletable.size
-      // if we are deleting everything, create a new empty segment
-      if(numToDelete == view.size)
-        roll()
-      segments.trunc(numToDelete)
-    }
-  }
-
-  /**
-   * Get the size of the log in bytes
-   */
-  def size: Long =
-    segments.view.foldLeft(0L)(_ + _.size)
-
-  /**
-   * The byte offset of the message that will be appended next.
-   */
-  def nextAppendOffset: Long = {
-    flush
-    val last = segments.view.last
-    last.start + last.size
-  }
-
-  /**
-   *  get the current high watermark of the log
-   */
-  def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark
-
-  /**
-   * Roll the log over if necessary
-   */
-  private def maybeRoll(segment: LogSegment) {
-    if(segment.messageSet.sizeInBytes > maxSize)
-      roll()
-  }
-
-  /**
-   * Create a new segment and make it active
-   */
-  def roll() {
-    lock synchronized {
-      val last = segments.view.last
-      val newOffset = nextAppendOffset
-      val newFile = new File(dir, Log.nameFromOffset(newOffset))
-      debug("Rolling log '" + name + "' to " + newFile.getName())
-      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
-    }
-  }
-
-  /**
-   * Flush the log if necessary
-   */
-  private def maybeFlush(numberOfMessages : Int) {
-    if(unflushed.addAndGet(numberOfMessages) >= flushInterval) {
-      flush()
-    }
-  }
-
-  /**
-   * Flush this log file to the physical disk
-   */
-  def flush() : Unit = {
-    if (unflushed.get == 0) return
-
-    lock synchronized {
-      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
-          System.currentTimeMillis)
-      segments.view.last.messageSet.flush()
-      unflushed.set(0)
-      lastflushedTime.set(System.currentTimeMillis)
-     }
-  }
-
-  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
-    val segsArray = segments.view
-    var offsetTimeArray: Array[Tuple2[Long, Long]] = null
-    if (segsArray.last.size > 0)
-      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
-    else
-      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
-
-    for (i <- 0 until segsArray.length)
-      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
-    if (segsArray.last.size > 0)
-      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
-
-    var startIndex = -1
-    request.time match {
-      case OffsetRequest.LatestTime =>
-        startIndex = offsetTimeArray.length - 1
-      case OffsetRequest.EarliestTime =>
-        startIndex = 0
-      case _ =>
-          var isFound = false
-          debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
-          startIndex = offsetTimeArray.length - 1
-          while (startIndex >= 0 && !isFound) {
-            if (offsetTimeArray(startIndex)._2 <= request.time)
-              isFound = true
-            else
-              startIndex -=1
-          }
-    }
-
-    val retSize = request.maxNumOffsets.min(startIndex + 1)
-    val ret = new Array[Long](retSize)
-    for (j <- 0 until retSize) {
-      ret(j) = offsetTimeArray(startIndex)._1
-      startIndex -= 1
-    }
-    ret
-  }
- 
-  def getTopicName():String = {
-    name.substring(0, name.lastIndexOf("-"))
-  }
-
-  def getLastFlushedTime():Long = {
-    return lastflushedTime.get
-  }
-}
-  
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1340152)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -24,6 +24,7 @@
 import kafka.server.KafkaConfig
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
+import kafka.log.Log._
 
 /**
  * The guy who creates and hands out logs
@@ -64,7 +65,7 @@
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery)
+        val log = new DiskLog(dir, maxSize, flushInterval, needRecovery)
         val topicPartion = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartion._1)
@@ -113,7 +114,7 @@
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false)
+      new DiskLog(d, maxSize, flushInterval, false)
     }
   }
 
@@ -143,7 +144,7 @@
     val log = getLog(offsetRequest.topic, offsetRequest.partition)
     log match {
       case Some(l) => l.getOffsetsBefore(offsetRequest)
-      case None => Log.getEmptyOffsets(offsetRequest)
+      case None => getEmptyOffsets(offsetRequest)
     }
   }
 
@@ -207,7 +208,7 @@
   /* Runs through the log removing segments older than a certain age */
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
-    val topic = Utils.getTopicPartition(log.dir.getName)._1
+    val topic = Utils.getTopicPartition(log.name)._1
     val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
     val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
     val total = deleteSegments(log, toBeDeleted)
Index: core/src/main/scala/kafka/log/DiskLog.scala
===================================================================
--- core/src/main/scala/kafka/log/DiskLog.scala	(revision 0)
+++ core/src/main/scala/kafka/log/DiskLog.scala	(revision 0)
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import kafka.api.OffsetRequest
+import kafka.log.Log._
+import java.io.{IOException, RandomAccessFile, File}
+import java.util.{Comparator, Collections, ArrayList}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
+import kafka.message.{MessageSet, InvalidMessageException, FileMessageSet}
+import kafka.utils._
+
+/**
+ * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
+ */
+class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+  @volatile var deleted = false
+  def size: Long = messageSet.highWaterMark
+  override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
+}
+
+
+/**
+ * An append-only log for storing messages. 
+ */
+@threadsafe
+private[kafka] class DiskLog(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean)
+  extends Log with Logging {
+
+  /* A lock that guards all modifications to the log */
+  private val lock = new Object
+
+  /* The current number of unflushed messages appended to the write */
+  private val unflushed = new AtomicInteger(0)
+
+   /* last time it was flushed */
+  private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
+
+  /* The actual segments of the log */
+  private[log] val segments: SegmentList[LogSegment] = loadSegments()
+
+
+  /* create the leader highwatermark file handle */
+  private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
+
+  private var hw: Long = 0
+
+  private val logStats = new LogStats(this)
+
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
+
+  /* The name of this log */
+  def name  = dir.getName()
+
+  /* Load the log segments from the log files on disk */
+  private def loadSegments(): SegmentList[LogSegment] = {
+    // open all the segments read-only
+    val logSegments = new ArrayList[LogSegment]
+    val ls = dir.listFiles()
+    if(ls != null) {
+      for(file <- ls if file.isFile && file.toString.endsWith(FileSuffix)) {
+        if(!file.canRead)
+          throw new IOException("Could not read file " + file)
+        val filename = file.getName()
+        val start = filename.substring(0, filename.length - FileSuffix.length).toLong
+        val messageSet = new FileMessageSet(file, false)
+        logSegments.add(new LogSegment(file, messageSet, start))
+      }
+    }
+
+    if(logSegments.size == 0) {
+      // no existing segments, create a new mutable segment
+      val newFile = new File(dir, nameFromOffset(0))
+      val set = new FileMessageSet(newFile, true)
+      logSegments.add(new LogSegment(newFile, set, 0))
+    } else {
+      // there is at least one existing segment, validate and recover them/it
+      // sort segments into ascending order for fast searching
+      Collections.sort(logSegments, new Comparator[LogSegment] {
+        def compare(s1: LogSegment, s2: LogSegment): Int = {
+          if(s1.start == s2.start) 0
+          else if(s1.start < s2.start) -1
+          else 1
+        }
+      })
+      validateSegments(logSegments)
+
+      //make the final section mutable and run recovery on it if necessary
+      val last = logSegments.remove(logSegments.size - 1)
+      last.messageSet.close()
+      info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
+      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
+      logSegments.add(mutable)
+    }
+    new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
+  }
+
+  /**
+   * Check that the ranges and sizes add up, otherwise we have lost some data somewhere
+   */
+  private def validateSegments(segments: ArrayList[LogSegment]) {
+    lock synchronized {
+      for(i <- 0 until segments.size - 1) {
+        val curr = segments.get(i)
+        val next = segments.get(i+1)
+        if(curr.start + curr.size != next.start)
+          throw new IllegalStateException("The following segments don't validate: " +
+                  curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+      }
+    }
+  }
+
+  /**
+   * The number of segments in the log
+   */
+  def numberOfSegments: Int = segments.view.length
+
+  /**
+   * Close this log
+   */
+  def close() {
+    debug("Closing log " + name)
+    lock synchronized {
+      for(seg <- segments.view)
+        seg.messageSet.close()
+      checkpointHW()
+      hwFile.close()
+    }
+  }
+
+  /**
+   * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
+   * Returns the offset at which the messages are written.
+   */
+  def append(messages: MessageSet): Unit = {
+    // validate the messages
+    var numberOfMessages = 0
+    for(messageAndOffset <- messages) {
+      if(!messageAndOffset.message.isValid)
+        throw new InvalidMessageException()
+      numberOfMessages += 1;
+    }
+
+    logStats.recordAppendedMessages(numberOfMessages)
+    
+    // they are valid, insert them in the log
+    lock synchronized {
+      try {
+        val segment = segments.view.last
+        segment.messageSet.append(messages)
+        maybeFlush(numberOfMessages)
+        maybeRoll(segment)
+      }
+      catch {
+        case e: IOException =>
+          fatal("Halting due to unrecoverable I/O error while handling producer request", e)
+          Runtime.getRuntime.halt(1)
+        case e2 => throw e2
+      }
+    }
+  }
+
+  /**
+   * Read from the log file at the given offset
+   */
+  def read(offset: Long, length: Int): MessageSet = {
+    val view = segments.view
+    Log.findRange(view, offset, view.length) match {
+      case Some(segment) => segment.messageSet.read((offset - segment.start), length)
+      case _ => MessageSet.Empty
+    }
+  }
+
+  /**
+   * Delete any log segments matching the given predicate function
+   */
+  def markDeletedWhile(predicate: LogSegment => Boolean): Seq[LogSegment] = {
+    lock synchronized {
+      val view = segments.view
+      val deletable = view.takeWhile(predicate)
+      for(seg <- deletable)
+        seg.deleted = true
+      val numToDelete = deletable.size
+      // if we are deleting everything, create a new empty segment
+      if(numToDelete == view.size)
+        roll()
+      segments.trunc(numToDelete)
+    }
+  }
+
+  /**
+   * Get the size of the log in bytes
+   */
+  def size: Long =
+    segments.view.foldLeft(0L)(_ + _.size)
+
+  /**
+   * The byte offset of the message that will be appended next.
+   */
+  def nextAppendOffset: Long = {
+    flush
+    val last = segments.view.last
+    last.start + last.size
+  }
+
+  /**
+   *  get the current high watermark of the log
+   */
+  def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark
+
+  /**
+   *  get the offset of the last message in the log
+   */
+  def getLogEndOffset: Long = segments.view.last.messageSet.getEndOffset()
+
+  /**
+   * Roll the log over if necessary
+   */
+  private def maybeRoll(segment: LogSegment) {
+    if(segment.messageSet.sizeInBytes > maxSize)
+      roll()
+  }
+
+  /**
+   * Create a new segment and make it active
+   */
+  def roll() {
+    lock synchronized {
+      val last = segments.view.last
+      val newOffset = nextAppendOffset
+      val newFile = new File(dir, nameFromOffset(newOffset))
+      debug("Rolling log '" + name + "' to " + newFile.getName())
+      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+    }
+  }
+
+  /**
+   * Flush the log if necessary
+   */
+  private def maybeFlush(numberOfMessages : Int) {
+    if(unflushed.addAndGet(numberOfMessages) >= flushInterval) {
+      flush()
+    }
+  }
+
+  /**
+   * Flush this log file to the physical disk
+   */
+  def flush() : Unit = {
+    if (unflushed.get == 0) return
+
+    lock synchronized {
+      debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
+          System.currentTimeMillis)
+      segments.view.last.messageSet.flush()
+      unflushed.set(0)
+      lastflushedTime.set(System.currentTimeMillis)
+      checkpointHW()
+     }
+  }
+
+  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
+    val segsArray = segments.view
+    var offsetTimeArray: Array[Tuple2[Long, Long]] = null
+    if (segsArray.last.size > 0)
+      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + 1)
+    else
+      offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
+
+    for (i <- 0 until segsArray.length)
+      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
+    if (segsArray.last.size > 0)
+      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
+
+    var startIndex = -1
+    request.time match {
+      case OffsetRequest.LatestTime =>
+        startIndex = offsetTimeArray.length - 1
+      case OffsetRequest.EarliestTime =>
+        startIndex = 0
+      case _ =>
+          var isFound = false
+          debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+          startIndex = offsetTimeArray.length - 1
+          while (startIndex >= 0 && !isFound) {
+            if (offsetTimeArray(startIndex)._2 <= request.time)
+              isFound = true
+            else
+              startIndex -=1
+          }
+    }
+
+    val retSize = request.maxNumOffsets.min(startIndex + 1)
+    val ret = new Array[Long](retSize)
+    for (j <- 0 until retSize) {
+      ret(j) = offsetTimeArray(startIndex)._1
+      startIndex -= 1
+    }
+    ret
+  }
+
+  def recoverUptoLastCheckpointedHW() {
+    if(hwFile.length() > 0) {
+      // read the last checkpointed hw from disk
+      hwFile.seek(0)
+      val lastKnownHW = hwFile.readLong()
+      // find the log segment that has this hw
+      val segmentToBeTruncated = segments.view.find(segment =>
+        lastKnownHW >= segment.start && lastKnownHW < segment.size)
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          val truncatedSegmentIndex = segments.view.indexOf(segment)
+          segments.truncLast(truncatedSegmentIndex)
+        case None =>
+      }
+
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          segment.messageSet.truncateUpto(lastKnownHW)
+          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
+        case None =>
+          assert(lastKnownHW <= segments.view.last.messageSet.size,
+            "Last checkpointed hw cannot be greater than the latest message in the log")
+          error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
+            .format(lastKnownHW, segments.view.head.start, segments.view.last.messageSet.size))
+      }
+    }else
+      info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
+  }
+
+  def setHW(latestLeaderHW: Long) {
+    hw = latestLeaderHW
+  }
+
+  def checkpointHW() {
+    hwFile.seek(0)
+    hwFile.writeLong(hw)
+    hwFile.getChannel.force(true)
+  }
+
+  def getTopicName():String = {
+    name.substring(0, name.lastIndexOf("-"))
+  }
+
+  def getLastFlushedTime():Long = {
+    return lastflushedTime.get
+  }
+}
+  
Index: core/src/main/scala/kafka/cluster/Replica.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Replica.scala	(revision 1340152)
+++ core/src/main/scala/kafka/cluster/Replica.scala	(working copy)
@@ -18,6 +18,87 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.common.InvalidPartitionException
+import kafka.server.{KafkaConfig, ReplicaFetcherThread}
+import java.lang.IllegalStateException
+import kafka.utils.Logging
 
-case class Replica(brokerId: Int, partition: Partition, topic: String,
-                   var log: Option[Log] = None, var hw: Long = -1, var leo: Long = -1, isLocal: Boolean = false)
\ No newline at end of file
+class Replica(val brokerId: Int, val partition: Partition, val topic: String,
+              var log: Option[Log] = None, var leoUpdateTime: Long = -1L) extends Logging {
+  private var logEndOffset: Long = -1L
+  private var replicaFetcherThread: ReplicaFetcherThread = null
+
+  def leo(newLeo: Option[Long] = None): Long = {
+    isLocal match {
+      case true =>
+        newLeo match {
+          case Some(newOffset) => throw new IllegalStateException("Trying to set the leo %d for local log".format(newOffset))
+          case None => log.get.getLogEndOffset
+        }
+      case false =>
+        newLeo match {
+          case Some(newOffset) =>
+            logEndOffset = newOffset
+            logEndOffset
+          case None => logEndOffset
+        }
+    }
+  }
+
+  def isLocal: Boolean = {
+    log match {
+      case Some(l) => true
+      case None => false
+    }
+  }
+
+  def hw(highwaterMarkOpt: Option[Long] = None): Long = {
+    highwaterMarkOpt match {
+      case Some(highwaterMark) =>
+        isLocal match {
+          case true =>
+            trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
+                                                                                   brokerId, highwaterMark))
+            log.get.setHW(highwaterMark)
+            highwaterMark
+          case false => throw new InvalidPartitionException("Unable to set highwatermark for topic %s ".format(topic) +
+            "partition %d on broker %d, since there is no local log for this partition"
+              .format(partition.partitionId, brokerId))
+        }
+      case None =>
+        isLocal match {
+          case true =>
+            log.get.getHighwaterMark
+          case false => throw new InvalidPartitionException("Unable to get highwatermark for topic %s ".format(topic) +
+            "partition %d on broker %d, since there is no local log for this partition"
+              .format(partition.partitionId, brokerId))
+        }
+    }
+  }
+
+  def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) {
+    val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId)
+    replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config)
+    replicaFetcherThread.start()
+  }
+
+  def stopReplicaFetcherThread() {
+    if(replicaFetcherThread != null)
+      replicaFetcherThread.shutdown()
+  }
+
+  def close() {
+    if(replicaFetcherThread != null)
+      replicaFetcherThread.shutdown()
+  }
+
+  override def toString(): String = {
+    val replicaString = new StringBuilder
+    replicaString.append("ReplicaId: " + brokerId)
+    replicaString.append("; Topic: " + topic)
+    replicaString.append("; Partition: " + partition.toString)
+    replicaString.append("; isLocal: " + isLocal)
+    replicaString.append("; Highwatermark: " + hw())
+    replicaString.toString()
+  }
+}
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1340152)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -16,12 +16,85 @@
  */
 package kafka.cluster
 
+import kafka.common.NoLeaderForPartitionException
+import kafka.utils.{SystemTime, Time, Logging}
+
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
  * TODO: Commit queue to be added as part of KAFKA-46. Add AR, ISR, CUR, RAR state maintenance as part of KAFKA-302
  */
-case class Partition(topic: String, val partId: Int, var leader: Option[Replica] = None,
-                     assignedReplicas: Set[Replica] = Set.empty[Replica],
-                     inSyncReplicas: Set[Replica] = Set.empty[Replica],
-                     catchUpReplicas: Set[Replica] = Set.empty[Replica],
-                     reassignedReplicas: Set[Replica] = Set.empty[Replica])
+case class Partition(topic: String, partitionId: Int, time: Time = SystemTime,
+                     var inSyncReplicas: Set[Replica] = Set.empty[Replica],
+                     var catchUpReplicas: Set[Replica] = Set.empty[Replica],
+                     var reassignedReplicas: Set[Replica] = Set.empty[Replica]) extends Logging {
+  private var leaderReplicaId: Option[Int] = None
+  private var assignedReplicas: Set[Replica] = Set.empty[Replica]
+  private var highWatermarkUpdateTime: Long = -1L
+
+  def leaderId(newLeader: Option[Int] = None): Option[Int] = {
+    if(newLeader.isDefined)
+      leaderReplicaId = newLeader
+    leaderReplicaId
+  }
+
+  def assignedReplicas(replicas: Option[Set[Replica]] = None): Set[Replica] = {
+    replicas match {
+      case Some(ar) =>
+        assignedReplicas = ar
+      case None =>
+    }
+    assignedReplicas
+  }
+
+  def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
+
+  def addReplica(replica: Replica): Boolean = {
+    if(!assignedReplicas.contains(replica)) {
+      assignedReplicas += replica
+      true
+    }else false
+  }
+
+  def updateReplicaLEO(replica: Replica, leo: Long) {
+    val replicaMetadata = assignedReplicas().find(_.brokerId == replica.brokerId)
+    replicaMetadata.get.leoUpdateTime = time.milliseconds
+    replicaMetadata.get.leo(Some(leo))
+    debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
+  }
+
+  def leaderReplica(): Replica = {
+    val leaderReplicaId = leaderId()
+    if(leaderReplicaId.isDefined) {
+      val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get)
+      if(leaderReplica.isDefined) leaderReplica.get
+      else throw new IllegalStateException("No replica for leader %d in the replica manager"
+        .format(leaderReplicaId.get))
+    }else
+      throw new NoLeaderForPartitionException("Leader for topic %s partition %d does not exist"
+        .format(topic, partitionId))
+  }
+
+  def leaderHW(newHw: Option[Long] = None): Long = {
+    newHw match {
+      case Some(highWatermark) =>
+        leaderReplica().hw(newHw)
+        highWatermarkUpdateTime = time.milliseconds
+        highWatermark
+      case None =>
+        leaderReplica().hw()
+    }
+  }
+
+  def hwUpdateTime: Long = highWatermarkUpdateTime
+
+  override def toString(): String = {
+    val partitionString = new StringBuilder
+    partitionString.append("Topic: " + topic)
+    partitionString.append("; Partition: " + partitionId)
+    partitionString.append("; Leader: " + leaderId())
+    partitionString.append("; Assigned replicas: " + assignedReplicas().map(_.brokerId).mkString(","))
+    partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.append("; Reassigned replicas: " + reassignedReplicas.map(_.brokerId).mkString(","))
+    partitionString.toString()
+  }
+}
Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(revision 1340152)
+++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(working copy)
@@ -55,14 +55,14 @@
       m.leader match {
         case Some(leader) =>
           val leaderReplica = new Replica(leader.id, partition, topic)
-          partition.leader = Some(leaderReplica)
+          partition.leaderId(Some(leaderReplica.brokerId))
           debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
           partition
         case None =>
           debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
           partition
       }
-    }.sortWith((s, t) => s.partId < t.partId)
+    }.sortWith((s, t) => s.partitionId < t.partitionId)
   }
 
   /**
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1340152)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -93,11 +93,8 @@
       val partitionIndex = getPartition(event.getKey, totalNumPartitions)
       val brokerPartition = topicPartitionsList(partitionIndex)
 
-      val leaderBrokerId = brokerPartition.leader match {
-        case Some(leader) => leader.brokerId
-        case None => -1
-        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-      }
+      // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+      val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
 
       var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
       ret.get(leaderBrokerId) match {
@@ -108,7 +105,7 @@
           ret.put(leaderBrokerId, dataPerBroker)
       }
 
-      val topicAndPartition = (event.getTopic, brokerPartition.partId)
+      val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
       var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
       dataPerBroker.get(topicAndPartition) match {
         case Some(element) =>
@@ -126,7 +123,7 @@
     debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
     val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
     debug("Broker partitions registered for topic: %s are %s"
-      .format(pd.getTopic, topicPartitionsList.map(p => p.partId).mkString(",")))
+      .format(pd.getTopic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
     topicPartitionsList
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1340152)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -146,6 +146,8 @@
     */
   def highWaterMark(): Long = setHighWaterMark.get()
 
+  def getEndOffset(): Long = offset + sizeInBytes()
+
   def checkMutable(): Unit = {
     if(!mutable)
       throw new IllegalStateException("Attempt to invoke mutation on immutable message set.")
@@ -208,7 +210,13 @@
     needRecover.set(false)    
     len - validUpTo
   }
-  
+
+  def truncateUpto(hw: Long) = {
+    channel.truncate(hw)
+    setSize.set(hw)
+    setHighWaterMark.set(hw)
+  }
+
   /**
    * Read, validate, and discard a single message, returning the next valid offset, and
    * the message being validated
Index: core/src/main/scala/kafka/common/ErrorMapping.scala
===================================================================
--- core/src/main/scala/kafka/common/ErrorMapping.scala	(revision 1340152)
+++ core/src/main/scala/kafka/common/ErrorMapping.scala	(working copy)
@@ -31,7 +31,7 @@
   val NoError = 0
   val OffsetOutOfRangeCode = 1
   val InvalidMessageCode = 2
-  val WrongPartitionCode = 3
+  val InvalidPartitionCode = 3
   val InvalidFetchSizeCode = 4
   val InvalidFetchRequestFormatCode = 5
 
@@ -39,7 +39,7 @@
     Map[Class[Throwable], Int](
       classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
-      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
+      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode
     ).withDefaultValue(UnknownCode)
Index: core/src/main/scala/kafka/network/SocketServer.scala
===================================================================
--- core/src/main/scala/kafka/network/SocketServer.scala	(revision 1340152)
+++ core/src/main/scala/kafka/network/SocketServer.scala	(working copy)
@@ -64,6 +64,7 @@
    * Shutdown the socket server
    */
   def shutdown() = {
+    info("Shutting down socket server")
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1340152)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -133,20 +133,21 @@
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
     try {
       // NOTE: first increment epoch, then become leader
       val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
       createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
         "%d;%d".format(brokerId, newEpoch))
       val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+      val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
       updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
-        "%s;%d".format(currentISR.mkString(","), newEpoch))
+        "%s;%d".format(updatedISR.mkString(","), newEpoch))
       info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
-      Some(newEpoch)
+      Some(newEpoch, updatedISR)
     } catch {
       case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
-      case oe => None
+      case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
     }
   }
 
@@ -161,7 +162,7 @@
 
     val newEpoch = epoch match {
       case Some(partitionEpoch) =>
-        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
+        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
         partitionEpoch + 1
       case None =>
         // this is the first time leader is elected for this partition. So set epoch to 1

Property changes on: core/src/main/scala/kafka/server
___________________________________________________________________
Added: svn:ignore
   + FollowerReplica.scala


Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1340152)
+++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -20,11 +20,12 @@
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import java.net.InetAddress
-import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
 import kafka.admin.AdminUtils
 import java.lang.{Thread, IllegalStateException}
+import kafka.common.{NoLeaderForPartitionException, InvalidPartitionException, KafkaZookeeperClient}
+import kafka.utils.ZkUtils._
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -33,8 +34,10 @@
  *
  */
 class KafkaZooKeeper(config: KafkaConfig,
-                     addReplicaCbk: (String, Int) => Replica,
-                     getReplicaCbk: (String, Int) => Option[Replica]) extends Logging {
+                     addReplicaCbk: (String, Int, Set[Int]) => Replica,
+                     getReplicaCbk: (String, Int) => Option[Replica],
+                     becomeLeader: (Replica, Seq[Int]) => Unit,
+                     becomeFollower: (Replica, Int) => Unit) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
@@ -174,8 +177,9 @@
   private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
     partitions.foreach { partition =>
       val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+      info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
       if(assignedReplicas.contains(config.brokerId)) {
-        val replica = addReplicaCbk(topic, partition)
+        val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
         startReplica(replica)
       } else
         warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
@@ -184,42 +188,127 @@
   }
 
   private def startReplica(replica: Replica) {
-    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partId, replica.brokerId))
-    replica.log match {
-      case Some(log) =>  // log is already started
-      case None =>
-      // TODO: Add log recovery upto the last checkpointed HW as part of KAFKA-46
-    }
-    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partId) match {
-      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
+    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId,
+      replica.brokerId))
+    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,
+        leader))
+        // check if this broker is the leader, if not, then become follower
+        if(leader != config.brokerId)
+          becomeFollower(replica, leader)
       case None => // leader election
         leaderElection(replica)
     }
   }
 
   def leaderElection(replica: Replica) {
-    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partId))
+    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
     // read the AR list for replica.partition from ZK
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partId).map(r => r.toInt)
-    // TODO: read the ISR as part of KAFKA-302
-    if(assignedReplicas.contains(replica.brokerId)) {
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
+    val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
+    if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId,
+                       assignedReplicas, inSyncReplicas, liveBrokers)) {
+      info("Broker %d will participate in leader election for topic %s partition %d".format(config.brokerId, replica.topic,
+                                                                                           replica.partition.partitionId))
       // wait for some time if it is not the preferred replica
       try {
-        if(replica.brokerId != assignedReplicas.head)
-          Thread.sleep(config.preferredReplicaWaitTime)
+        if(replica.brokerId != assignedReplicas.head) {
+          // sleep only if the preferred replica is alive
+          if(liveBrokers.contains(assignedReplicas.head)) {
+            info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) +
+              "partition %d is alive. Waiting for %d ms to allow it to become leader"
+              .format(replica.partition.partitionId, config.preferredReplicaWaitTime))
+            Thread.sleep(config.preferredReplicaWaitTime)
+          }
+        }
       }catch {
         case e => // ignoring
       }
-      val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
-      newLeaderEpoch match {
-        case Some(epoch) =>
-          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
-          // TODO: Become leader as part of KAFKA-302
+      val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
+        replica.partition.partitionId, replica.brokerId)
+      newLeaderEpochAndISR match {
+        case Some(epochAndISR) =>
+          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic,
+            replica.partition.partitionId))
+          info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId,
+                                                                    epochAndISR._2.mkString(",")))
+          becomeLeader(replica, epochAndISR._2)
         case None =>
+          ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+            case Some(leader) =>
+              becomeFollower(replica, leader)
+            case None =>
+              error("Lost leader for topic %s partition %d right after leader election".format(replica.topic,
+                replica.partition.partitionId))
+          }
       }
     }
   }
 
+  private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int],
+                              inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = {
+    // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
+    assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
+      " %s partition %d".format(topic, partition))
+    inSyncReplicas.size > 0 match {
+      case true => // check if this broker is in the ISR. If yes, return true
+        inSyncReplicas.contains(brokerId) match {
+          case true =>
+            info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) +
+              " for topic %s partition %d".format(topic, partition))
+            true
+          case false =>
+            // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR
+            val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r))
+            liveBrokersInISR.isEmpty match {
+              case true =>
+                if(assignedReplicas.contains(brokerId)) {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  true
+                }else {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  false
+                }
+              case false =>
+                info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d "
+                  .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " +
+                  "in the ISR")
+                false  // let one of the live brokers in the ISR become the leader
+            }
+        }
+      case false =>
+        if(assignedReplicas.contains(brokerId)) {
+          info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
+            .format(topic, partition, brokerId) + "is part of the assigned replicas list")
+          true
+        }else {
+          info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
+            .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
+          false
+        }
+    }
+  }
+
+  def updateInSyncReplicasForPartition(topic: String, partition: Int, newISR: Seq[Int]) = {
+    val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partition.toString))
+    if(replicaListAndEpochString == null) {
+      throw new NoLeaderForPartitionException(("Illegal partition state. ISR cannot be updated for topic " +
+        "%s partition %d since leader and ISR does not exist in ZK".format(topic, partition)))
+    }
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      val epoch = replicasAndEpochInfo.last
+      updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partition.toString),
+        "%s;%s".format(newISR.mkString(","), epoch))
+      info("Updating ISR for for topic %s partition %d to %s in ZK".format(topic, partition, newISR.mkString(",")))
+    }
+  }
+
   class TopicChangeListener extends IZkChildListener with Logging {
 
     @throws(classOf[Exception])
@@ -270,9 +359,20 @@
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       // handle leader change event for path
-      val newLeader: String = data.asInstanceOf[String]
-      debug("Leader change listener fired for path %s. New leader is %s".format(dataPath, newLeader))
-      // TODO: update the leader in the list of replicas maintained by the log manager
+      val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
+      val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
+      val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
+      debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch))
+      // TODO: update the leader in the list of replicas maintained by the replica manager
+      val topicPartitionInfo = dataPath.split("/")
+      val topic = topicPartitionInfo.takeRight(4).head
+      val partition = topicPartitionInfo.takeRight(2).head.toInt
+      info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
+      val replica = getReplicaCbk(topic, partition).getOrElse(null)
+      assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
+        .format(topic, partition, config.brokerId))
+      replica.partition.leaderId(Some(newLeader))
+      assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
     }
 
     @throws(classOf[Exception])
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1340152)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -20,6 +20,7 @@
 import java.util.Properties
 import kafka.utils.{Utils, ZKConfig}
 import kafka.message.Message
+import kafka.consumer.ConsumerConfig
 
 /**
  * Configuration settings for the kafka server
@@ -105,7 +106,25 @@
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
+  val keepInSyncTime = Utils.getLong(props, "isr.in.sync.time.ms", 30000)
+
   /* size of the state change request queue in Zookeeper */
   val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
 
+  /**
+   * Config options relevant to a follower for a replica
+   */
+  /** the socket timeout for network requests */
+  val socketTimeoutMs = Utils.getInt(props, "follower.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+
+  /** the socket receive buffer for network requests */
+  val socketBufferSize = Utils.getInt(props, "follower.socket.buffersize", ConsumerConfig.SocketBufferSize)
+
+  /** the number of byes of messages to attempt to fetch */
+  val fetchSize = Utils.getInt(props, "follower.fetch.size", ConsumerConfig.FetchSize)
+
+  val maxWaitTimeMs = Utils.getInt(props, "follower.fetch.wait.time.ms", 500)
+
+  val minBytes = Utils.getInt(props, "follower.fetch.min.bytes", 4086)
+
  }
Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision 1340152)
+++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import org.apache.log4j._
 import kafka.network._
 import kafka.utils._
 
@@ -42,7 +41,7 @@
 
 class KafkaRequestHandlerPool(val requestChannel: RequestChannel, 
                               val apis: KafkaApis, 
-                              numThreads: Int) { 
+                              numThreads: Int) extends Logging {
   
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
@@ -53,10 +52,12 @@
   }
   
   def shutdown() {
+    info("Shutting down request handlers")
     for(handler <- runnables)
       handler.shutdown
     for(thread <- threads)
       thread.join
+    info("Request handlers shut down")
   }
   
 }
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1340152)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -21,15 +21,17 @@
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
-import kafka.cluster.Replica
 import java.util.concurrent._
 import atomic.AtomicBoolean
+import kafka.cluster.{Partition, Replica}
+import locks.ReentrantLock
+import java.lang.IllegalStateException
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig) extends Logging {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
@@ -39,7 +41,10 @@
   var requestHandlerPool: KafkaRequestHandlerPool = null
   private var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
-  val replicaManager = new ReplicaManager(config)
+  private var replicaManager: ReplicaManager = null
+  private var leaderReplicas = new PriorityBlockingQueue[Partition](100, new PartitionHWComparator)
+  private var isrExpirationThread: ISRExpirationThread = null
+  private val leaderISRUpdateLock = new ReentrantLock()
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -68,9 +73,12 @@
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
+    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, becomeLeader, becomeFollower)
 
-    val apis = new KafkaApis(socketServer.requestChannel, logManager, kafkaZookeeper)
+    replicaManager = new ReplicaManager(config, time)
+
+    val apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager,
+      kafkaZookeeper, updateLeaderAndISRInCacheAndZk)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
     socketServer.startup
 
@@ -84,6 +92,14 @@
 
     // starting relevant replicas and leader election for partitions assigned to this broker
     kafkaZookeeper.startup
+
+    // start ISR expiration thread
+    isrExpirationThread =  new ISRExpirationThread("ISRExpirationThread-" + config.brokerId,
+                                                              config, time,
+                                                              leaderReplicas,
+                                                              updateLeaderAndISRInCacheAndZk)
+    isrExpirationThread.start()
+
     info("Server started.")
   }
   
@@ -95,7 +111,10 @@
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("Shutting down Kafka server with id " + config.brokerId)
-      kafkaZookeeper.close
+      if(isrExpirationThread != null)
+        isrExpirationThread.shutdown()
+      if(replicaManager != null)
+        replicaManager.close()
       if (socketServer != null)
         socketServer.shutdown()
       if(requestHandlerPool != null)
@@ -103,6 +122,7 @@
       Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
         logManager.close()
+      kafkaZookeeper.close
 
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
@@ -117,14 +137,67 @@
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
-  def addReplica(topic: String, partition: Int): Replica = {
+  def becomeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    // stop replica fetcher thread, if any
+    replica.stopReplicaFetcherThread()
+    // read and cache the ISR
+    try {
+      leaderISRUpdateLock.lock()
+      replicaManager.updateLeaderInCache(replica)
+      replicaManager.updateISRInCache(replica.topic, replica.partition.partitionId, currentISRInZk)
+    }finally {
+      leaderISRUpdateLock.unlock()
+    }
+    // also add this partition to the ISR expiration priority queue
+    leaderReplicas.put(replica.partition)
+  }
+
+  def becomeFollower(replica: Replica, leaderBrokerId: Int) {
+    info("Broker %d becoming follower to leader %d for topic %s partition %d"
+      .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+    // remove this replica's partition from the ISR expiration queue
+    // TODO: This might need some synchronization with the ISR expiration thread
+    leaderReplicas.remove(replica.partition)
+    replica.log match {
+      case Some(log) =>  // log is already started
+        log.recoverUptoLastCheckpointedHW()
+      case None =>
+    }
+    // get leader for this replica
+    // TODO: Become follower only if it is not already following the same leader
+    val leaderBroker = ZkUtils.getBrokerInfoFromIds(kafkaZookeeper.getZookeeperClient, List(leaderBrokerId)).head
+    // start fetcher thread to current leader
+    replica.startReplicaFetcherThread(leaderBroker, config)
+  }
+
+  def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
+    info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
     // get local log
     val log = logManager.getOrCreateLog(topic, partition)
-    replicaManager.addLocalReplica(topic, partition, log)
+    replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
   }
 
-  def getReplica(topic: String, partition: Int): Option[Replica] = replicaManager.getReplica(topic, partition)
+  def updateLeaderAndISRInCacheAndZk(topic: String, partition: Int, newISR: Seq[Int], newLeader: Option[Replica] = None) {
+    try {
+      leaderISRUpdateLock.lock()
+      newLeader match {
+        case Some(leader) =>
+          replicaManager.updateLeaderInCache(leader)
+        case None =>
+      }
+      kafkaZookeeper.updateInSyncReplicasForPartition(topic, partition, newISR)
+      replicaManager.updateISRInCache(topic, partition, newISR.toList)
+    }catch {
+      case e => throw new IllegalStateException("Failed to update ISR for topic %s ".format(topic) +
+        "partition %d to %s".format(partition, newISR.mkString(",")), e)
+    }finally {
+      leaderISRUpdateLock.unlock()
+    }
+  }
 
+  def getReplica(topic: String, partition: Int): Option[Replica] =
+    replicaManager.getReplica(topic, partition)
+
   def getLogManager(): LogManager = logManager
 
   def getStats(): SocketServerStats = socketServer.stats
Index: core/src/main/scala/kafka/server/RequestPurgatory.scala
===================================================================
--- core/src/main/scala/kafka/server/RequestPurgatory.scala	(revision 1340152)
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala	(working copy)
@@ -21,7 +21,6 @@
 import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import kafka.api._
 import kafka.network._
 import kafka.utils._
 
Index: core/src/main/scala/kafka/server/ISRExpirationThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ISRExpirationThread.scala	(revision 0)
+++ core/src/main/scala/kafka/server/ISRExpirationThread.scala	(revision 0)
@@ -0,0 +1,102 @@
+package kafka.server
+
+import java.util.concurrent.locks.ReentrantLock
+import java.util.{Date, Comparator}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{CountDownLatch, PriorityBlockingQueue}
+import kafka.utils.{Time, Logging}
+import kafka.cluster.{Replica, Partition}
+
+class ISRExpirationThread(name: String, config: KafkaConfig, time: Time,
+                          leaderReplicas: PriorityBlockingQueue[Partition],
+                          updateISRForPartition: (String, Int, Seq[Int], Option[Replica]) => Unit)
+  extends Thread(name) with Logging {
+  private val keepInSyncLock = new ReentrantLock()
+  private val hasKeepInSyncTimeExpired = keepInSyncLock.newCondition()
+  private val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+
+  override def run() {
+    try {
+      keepInSyncLock.lock()
+      info("Started ISR expiration thread on leader " + config.brokerId)
+      while(isRunning.get()) {
+        val slowestPartition = leaderReplicas.take()
+        info("Fetched partition with the earliest ISR expiration time " + slowestPartition.toString())
+        var expirationTime = new Date(slowestPartition.hwUpdateTime + config.keepInSyncTime)
+        var expirationTimeNotReached = true
+        while(expirationTime.getTime > time.milliseconds) {
+          info("Waiting until %s for partition %s's ISR expiration time".format(expirationTime.toString, slowestPartition.toString()))
+          expirationTimeNotReached = hasKeepInSyncTimeExpired.awaitUntil(expirationTime)
+          if(expirationTimeNotReached) {
+            // maybe the head of the priority queue was updated. So compute expiration deadline again
+            if(leaderReplicas.size() > 0)
+              expirationTime = new Date(leaderReplicas.peek().hwUpdateTime + config.keepInSyncTime)
+          }
+          // else, it must've been a spurious wakeup or the deadline has expired
+        }
+        info("%d ms has elapsed. Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
+          .format(config.keepInSyncTime))
+        // deadline has elapsed. Shrink the ISR of partitions whose HW hasn't been updated for keep.in.sync.time.ms
+        val topic = slowestPartition.topic
+        val partitionId = slowestPartition.partitionId
+
+        // shrink the ISR for this partition
+        val inSyncReplicas = slowestPartition.assignedReplicas().filter(r => r.leo() >= slowestPartition.leaderHW())
+
+        info("Shrinking ISR for topic %s partition %d to %s".format(topic, partitionId,
+          inSyncReplicas.map(_.brokerId).mkString(",")))
+        val leaderReplica = slowestPartition.leaderReplica()
+
+        // update ISR in zk and in memory
+        updateISRForPartition(topic, partitionId, inSyncReplicas.map(_.brokerId).toSeq, None)
+
+        slowestPartition.leaderHW(Some(leaderReplica.hw()))
+        info("Setting leader HW for topic %s partition %d to %d".format(topic, partitionId, leaderReplica.hw()))
+        // add this partition with updated HW timestamp and HW back into the priority queue
+        leaderReplicas.add(slowestPartition)
+      }
+    }catch {
+      case e: InterruptedException => info("ISR expiration thread %s interrupted. Shutting down".format(name))
+      case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
+    }
+    finally {
+      shutdownComplete()
+      keepInSyncLock.unlock()
+    }
+  }
+
+  /**
+   * IMP NOTE: This API is only for test usage
+   */
+  def signal() {
+    try {
+      keepInSyncLock.lock()
+      hasKeepInSyncTimeExpired.signal()
+    }finally {
+      keepInSyncLock.unlock()
+    }
+  }
+
+  private def shutdownComplete() = {
+    shutdownLatch.countDown
+  }
+
+  def shutdown() {
+    info("Shutting down ISR expiration thread")
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("ISR expiration thread shutdown completed")
+  }
+}
+
+class PartitionHWComparator extends Comparator[Partition] {
+  override def compare(partition1: Partition, partition2: Partition): Int = {
+    if(partition1.hwUpdateTime < partition2.hwUpdateTime)
+      -1
+    else if(partition1.hwUpdateTime > partition2.hwUpdateTime)
+      1
+    else 0
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 0)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 0)
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+import kafka.api.FetchRequestBuilder
+import kafka.utils.Logging
+import kafka.cluster.{Broker, Replica}
+import kafka.consumer.SimpleConsumer
+
+class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig)
+  extends Thread(name) with Logging {
+  val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+  private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port,
+    config.socketTimeoutMs, config.socketBufferSize)
+
+  override def run() {
+    try {
+      info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId))
+      while(isRunning.get()) {
+        val builder = new FetchRequestBuilder().
+          clientId(name).
+          replicaId(replica.brokerId).
+          maxWait(config.maxWaitTimeMs).
+          minBytes(config.minBytes)
+
+        // TODO: Keep this simple single fetch for now. Change it to fancier multi fetch when message replication
+        // actually works
+        val fetchOffset = replica.leo()
+        trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d"
+          .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset))
+        builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.fetchSize)
+
+        val fetchRequest = builder.build()
+        val response = replicaConsumer.fetch(fetchRequest)
+        // append messages to local log
+        replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
+        replica.hw(Some(response.data.head.partitionData.head.hw))
+        trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+          .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.hw()))
+      }
+    }catch {
+      case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name))
+      case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1)
+    }
+    shutdownComplete()
+  }
+
+  private def shutdownComplete() = {
+    replicaConsumer.close()
+    shutdownLatch.countDown
+  }
+
+  def shutdown() {
+    info("Shutting down replica fetcher thread")
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("Replica fetcher thread shutdown completed")
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1340152)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.io.IOException
-import java.lang.IllegalStateException
 import java.util.concurrent.atomic._
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
@@ -30,12 +29,16 @@
 import kafka.utils.{SystemTime, Logging}
 import kafka.common._
 import scala.math._
+import java.lang.IllegalStateException
+import kafka.cluster.Replica
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
-  
+class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
+                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper,
+                updateISRForPartition: (String, Int, Seq[Int], Option[Replica]) => Unit) extends Logging {
+
   private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -74,7 +77,7 @@
     }
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
-       val topicData = readMessageSets(fetchReq.fetch.offsetInfo)
+       val topicData = readMessageSets(fetchReq.fetch)
        val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
        requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }
@@ -128,18 +131,32 @@
 
     // validate the request
     try {
-      fetchRequest.validate()  
+      fetchRequest.validate()
     } catch {
       case e:FetchRequestFormatException =>
         val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
-        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode), -1)
+        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response,
+          ErrorMapping.InvalidFetchRequestFormatCode), -1)
         requestChannel.sendResponse(channelResponse)
     }
-    
+
+    // check if the current broker is the leader for the partitions
+    val partitionVerificationData: Option[Array[TopicData]] = ensureLeaderOnThisBroker(fetchRequest)
+    partitionVerificationData match {
+      case Some(partitionVerificationResponse) =>
+        val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, partitionVerificationResponse)
+        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response,
+          ErrorMapping.InvalidPartitionCode), -1)
+        requestChannel.sendResponse(channelResponse)
+      case  None =>
+    }
+
     // if there are enough bytes available right now we can answer the request, otherwise we have to punt
     val availableBytes = availableFetchBytes(fetchRequest)
     if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
-      val topicData = readMessageSets(fetchRequest.offsetInfo)
+      val topicData = readMessageSets(fetchRequest)
+      debug("Returning fetch response %s for fetch request with correlation id %d"
+        .format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     } else {
@@ -158,25 +175,51 @@
     for(offsetDetail <- fetchRequest.offsetInfo) {
       for(i <- 0 until offsetDetail.partitions.size) {
         try {
-          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i)) 
+          debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
+          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
           val available = maybeLog match {
             case Some(log) => max(0, log.getHighwaterMark - offsetDetail.offsets(i))
             case None => 0
           }
-    	  totalBytes += math.min(offsetDetail.fetchSizes(i), available)
+          totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
-          case e: InvalidPartitionException => 
+          case e: InvalidPartitionException =>
             info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
         }
       }
     }
     totalBytes
   }
-  
+
+  private def ensureLeaderOnThisBroker(fetchRequest: FetchRequest): Option[Array[TopicData]] = {
+    val offsets = fetchRequest.offsetInfo
+    val topicData = new mutable.ArrayBuffer[TopicData]()
+
+    offsets.foreach { offsetInfo =>
+      val allPartitionData = new mutable.ArrayBuffer[PartitionData]()
+      val (partitions, offsets) = (offsetInfo.partitions, offsetInfo.offsets)
+      for( (partition, offset) <- (partitions, offsets).zipped.map((_,_)) ) {
+        try {
+          kafkaZookeeper.ensurePartitionOnThisBroker(offsetInfo.topic, partition)
+        }catch {
+          case e: InvalidPartitionException =>
+            allPartitionData.append(new PartitionData(partition, ErrorMapping.InvalidPartitionCode, offset, -1L,
+              MessageSet.Empty))
+        }
+      }
+      if(allPartitionData.size > 0)
+        topicData.append(new TopicData(offsetInfo.topic, allPartitionData.toArray))
+    }
+    if(topicData.size > 0)
+      Some(topicData.toArray)
+    else None
+  }
+
   /**
    * Read from all the offset details given and produce an array of topic datas
    */
-  private def readMessageSets(offsets: Seq[OffsetDetail]): Array[TopicData] = {
+  private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = {
+    val offsets = fetchRequest.offsetInfo
     val fetchedData = new mutable.ArrayBuffer[TopicData]()
     for(offsetDetail <- offsets) {
       val info = new mutable.ArrayBuffer[PartitionData]()
@@ -184,8 +227,48 @@
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
         val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
-          case Left(err) => new PartitionData(partition, err, offset, MessageSet.Empty)
-          case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages)
+          case Left(err) =>
+            fetchRequest.replicaId match {
+              case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+              case _ =>
+                val replicaOpt = replicaManager.getLeaderReplica(topic, partition)
+                replicaOpt match {
+                  case Some(replica) =>
+                    new PartitionData(partition, err, offset, replica.hw(), MessageSet.Empty)
+                  case None =>
+                    new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+                }
+            }
+          case Right(messages) =>
+            fetchRequest.replicaId match {
+              case -1 => new PartitionData(partition, ErrorMapping.NoError, offset, -1L, messages)
+              case _ => // fetch request from a follower
+                val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
+                replicaOpt match {
+                  case Some(replica) =>
+                    debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+                      .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                    replicaManager.updateReplicaLEO(replica, offset)
+                    // check if this replica needs to be added to the ISR
+                    if(replicaManager.checkIfISRCanBeExpanded(replica)) {
+                      val newISR = replica.partition.inSyncReplicas.map(_.brokerId) + replica.brokerId
+                      // update ISR in ZK and cache
+                      updateISRForPartition(topic, partition, newISR.toSeq, None)
+                    }
+                    replicaManager.maybeIncrementLeaderHW(replica)
+                    val leaderReplicaOpt = replicaManager.getLeaderReplica(topic, partition)
+                    leaderReplicaOpt match {
+                      case Some(leaderReplica) =>
+                        new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.hw(), messages)
+                      case None =>
+                        throw new IllegalStateException("Leader replica for topic %s partition %d".format(topic, partition) +
+                          " must exist on leader broker %d".format(logManager.config.brokerId))
+                    }
+                  case None =>
+                    throw new IllegalStateException("No replica %d in replica manager on %d"
+                      .format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                }
+            }
         }
         info.append(partitionInfo)
       }
@@ -286,7 +369,7 @@
      * When a request expires just answer it with whatever data is present
      */
     def expire(delayed: DelayedFetch) {
-      val topicData = readMessageSets(delayed.fetch.offsetInfo)
+      val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1340152)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -18,47 +18,157 @@
 
 import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
-import kafka.utils.Logging
 import collection.mutable
+import kafka.common.NoLeaderForPartitionException
+import java.util.concurrent.PriorityBlockingQueue
+import java.lang.IllegalStateException
+import kafka.utils.{Time, Logging}
 
-class ReplicaManager(config: KafkaConfig) extends Logging {
+class ReplicaManager(val config: KafkaConfig, time: Time) extends Logging {
 
-  private var replicas: mutable.Map[(String, Int), Replica] = new mutable.HashMap[(String, Int), Replica]()
+  private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
 
-  def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
-        r.log match {
+  def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
+    val partition = makeSurePartitionExists(topic, partitionId, assignedReplicaIds)
+    val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+
+    val replicaOpt = partition.getReplica(config.brokerId)
+    replicaOpt match {
+      case Some(replica) =>
+        info("Changing remote replica %s into a local replica".format(replica.toString))
+        replica.log match {
           case None =>
-            r.log = Some(log)
-          case Some(l) => // nothing to do since log already exists
+            replica.log = Some(log)
+          case Some(log) => // nothing to do since log already exists
         }
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
-        replicas += (topic, partitionId) -> replica
-        info("Added local replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        partition.addReplica(localReplica)
     }
-    replicas.get((topic, partitionId)).get
+    val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
+    partition.assignedReplicas(Some(assignedReplicas))
+    // get the replica objects for the assigned replicas for this partition
+    info("Added local replica %d for topic %s partition %s on broker %d"
+      .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+    localReplica
   }
 
-  def addRemoteReplica(topic: String, partitionId: Int): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
+  def makeSurePartitionExists(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
+    val newPartition = allReplicas.contains((topic, partitionId))
+    newPartition match {
+      case true => // partition exists, do nothing
+        allReplicas.get((topic, partitionId)).get
+      case false => // create remote replicas for each replica id in assignedReplicas
+        val partition = new Partition(topic, partitionId, time)
+        allReplicas += (topic, partitionId) -> partition
+        (assignedReplicaIds - config.brokerId).foreach(
+          replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
+        partition
+    }
+  }
+
+  def assurePartitionExists(topic: String, partitionId: Int): Partition = {
+    val partitionOpt = allReplicas.get((topic, partitionId))
+    partitionOpt match {
+      case Some(partition) => partition
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
-        replicas += (topic, partitionId) -> replica
-        info("Added remote replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        throw new IllegalStateException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
+        .format(topic, partitionId, config.brokerId))
     }
-    replicas.get((topic, partitionId)).get
   }
 
-  def getReplica(topic: String, partitionId: Int): Option[Replica] = {
-    replicas.get((topic, partitionId))
+  def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
+    val remoteReplica = new Replica(replicaId, partition, topic)
+
+    val replicaAdded = partition.addReplica(remoteReplica)
+    if(replicaAdded)
+      info("Added remote replica %d for topic %s partition %s on broker %d"
+        .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
+    remoteReplica
   }
-}
\ No newline at end of file
+
+  def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        replicas.getReplica(replicaId)
+      case None =>
+        None
+    }
+  }
+
+  def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        Some(replicas.leaderReplica())
+      case None =>
+        throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " +
+      "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+    }
+  }
+
+  def getPartition(topic: String, partitionId: Int): Option[Partition] =
+    allReplicas.get((topic, partitionId))
+
+  def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+    // set the replica leo
+    val partition = assurePartitionExists(replica.topic, replica.partition.partitionId)
+    partition.updateReplicaLEO(replica, fetchOffset)
+  }
+
+  def maybeIncrementLeaderHW(replica: Replica) {
+    // set the replica leo
+    val partition = assurePartitionExists(replica.topic, replica.partition.partitionId)
+    // set the leader HW to min of the leo of all replicas
+    val allLeos = partition.inSyncReplicas.map(_.leo())
+    val newHw = allLeos.min
+    val oldHw = partition.leaderHW()
+    if(newHw < oldHw) {
+      debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
+      partition.leaderHW(Some(newHw))
+    }
+  }
+
+  def updateLeaderInCache(newLeaderReplica: Replica) {
+    val partition = assurePartitionExists(newLeaderReplica.topic, newLeaderReplica.partition.partitionId)
+    // update partition's leader
+    info("Updating leader for for topic %s partition %d to replica %d".format(partition.topic, partition.partitionId,
+                                                                              newLeaderReplica.brokerId))
+    partition.leaderId(Some(newLeaderReplica.brokerId))
+  }
+
+  def updateISRInCache(topic: String, partitionId: Int, isrInZk: Seq[Int]) = {
+    val partition = assurePartitionExists(topic, partitionId)
+    // update partition's ISR
+    val inSyncReplicas = isrInZk.map { rid =>
+      partition.getReplica(rid) match {
+        case Some(replica) => replica
+        case None => throw new IllegalStateException("In sync replica %d for topic %s ".format(rid, topic) +
+          "partition %d doesn't exist in replica manager".format(partition))
+      }
+    }
+    partition.inSyncReplicas = inSyncReplicas.toSet
+    info("Updated ISR for for topic %s partition %d to %s in cache".format(topic, partition.partitionId, isrInZk.mkString(",")))
+  }
+
+  def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
+    val partitionOpt = allReplicas.get((replica.topic, replica.partition.partitionId))
+    partitionOpt match {
+      case Some(partition) =>
+        if(partition.inSyncReplicas.contains(replica)) false
+        else if(partition.assignedReplicas().contains(replica)) {
+          val leaderHW = partition.leaderHW()
+          replica.leo() >= leaderHW
+        }
+        else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+          " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+      case None => // throw exception
+        throw new NoLeaderForPartitionException("Check for ISR expansion failed on %d. Leader replica for ".format(config.brokerId) +
+          "topic %s partition %d doesn't exist in the replica manager".format(replica.topic, replica.partition.partitionId))
+    }
+  }
+
+  def close() {
+    allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
+  }
+}
Index: core/src/main/scala/kafka/api/FetchResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchResponse.scala	(revision 1340152)
+++ core/src/main/scala/kafka/api/FetchResponse.scala	(working copy)
@@ -29,18 +29,20 @@
     val partition = buffer.getInt
     val error = buffer.getInt
     val initialOffset = buffer.getLong
+    val hw = buffer.getLong()
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
   }
 }
 
-case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
-  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
+case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L,
+                         messages: MessageSet) {
+  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8
 
-  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
+  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
 
   def translatePartition(topic: String, randomSelector: String => Int): Int = {
     if (partition == ProducerRequest.RandomPartition)
@@ -123,6 +125,14 @@
     }
     messageSet.asInstanceOf[ByteBufferMessageSet]
   }
+
+  def highWatermark(topic: String, partition: Int): Long = {
+    topicMap.get(topic) match {
+      case Some(topicData) =>
+        TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
+      case None => -1L
+    }
+  }
 }
 
 // SENDS
@@ -131,10 +141,11 @@
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(20)
+  private val buffer = ByteBuffer.allocate(28)
   buffer.putInt(partitionData.partition)
   buffer.putInt(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
+  buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes.intValue())
   buffer.rewind()
 
Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
===================================================================
--- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(revision 1340152)
+++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(working copy)
@@ -272,7 +272,7 @@
         } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
             throw new IOException(_input + " current offset=" + _offset
                     + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.WrongPartitionCode()) {
+        } else if (errorCode == ErrorMapping.InvalidPartitionCode()) {
             throw new IOException(_input + " : wrong partition");
         } else if (errorCode != ErrorMapping.NoError()) {
             throw new IOException(_input + " current offset=" + _offset
