Index: core/src/test/scala/unit/kafka/utils/TestUtils.scala
===================================================================
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala	(working copy)
@@ -32,10 +32,10 @@
 import collection.mutable.ListBuffer
 import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
 import scala.collection.Map
-import kafka.serializer.Encoder
-import kafka.api.{ProducerRequest, TopicData, PartitionData}
+import kafka.api.{TopicData, PartitionData}
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
+import kafka.serializer.{DefaultEncoder, Encoder}
 
 /**
  * Utility functions to help with testing
@@ -122,6 +122,7 @@
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval", "1")
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props.put("replica.socket.timeout.ms", "1500")
     props
   }
   
@@ -280,12 +281,13 @@
   /**
    * Create a producer for the given host and port
    */
-  def createProducer[K, V](zkConnect: String): Producer[K, V] = {
+  def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
     val props = new Properties()
     props.put("zk.connect", zkConnect)
     props.put("buffer.size", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
+    props.put("serializer.class", encoder.getClass.getCanonicalName)
     new Producer[K, V](new ProducerConfig(props))
   }
 
Index: core/src/test/scala/unit/kafka/log/SegmentListTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/SegmentListTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/log/SegmentListTest.scala	(working copy)
@@ -49,6 +49,53 @@
   }
   
   @Test
+  def testTruncLast() {
+    {
+      val hd = List(1,2,3)
+      val tail = List(4,5,6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      val deleted = sl.truncLast(2)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+
+    {
+      val hd = List(1,2,3,4,5)
+      val tail = List(6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      try {
+        sl.truncLast(6)
+        sl.truncLast(5)
+        sl.truncLast(-1)
+        fail("Attempt to truncate with illegal index should fail")
+      }catch {
+        case e: IllegalArgumentException => // this is ok
+      }
+      val deleted = sl.truncLast(4)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+
+    {
+      val hd = List(1)
+      val tail = List(2,3,4,5,6)
+      val sl = new SegmentList(hd ::: tail)
+      val view = sl.view
+      assertEquals(hd ::: tail, view.iterator.toList)
+      val deleted = sl.truncLast(0)
+      assertEquals(hd, sl.view.iterator.toList)
+      assertEquals(tail, deleted.iterator.toList)
+      assertEquals("View should remain consistent", hd ::: tail, view.iterator.toList)
+    }
+  }
+
+  @Test
   def testTruncBeyondList() {
     val sl = new SegmentList(List(1, 2))
     sl.trunc(3)
Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(working copy)
@@ -63,33 +63,6 @@
   }
 
   @Test
-  def testEmptyLogs() {
-    val fetchResponse = simpleConsumer.fetch(new FetchRequestBuilder().addFetch("test", 0, 0, 300 * 1024).build())
-    assertFalse(fetchResponse.messageSet("test", 0).iterator.hasNext)
-
-    val name = "test"
-    val logFile = new File(logDir, name + "-0")
-    
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10)
-      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
-      assertTrue(!logFile.exists())
-    }
-
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10)
-      assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
-      assertTrue(!logFile.exists())
-    }
-
-    {
-      val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10)
-      assertEquals( 0, offsets.length )
-      assertTrue(!logFile.exists())
-    }
-  }
-
-  @Test
   def testGetOffsetsBeforeLatestTime() {
     val topicPartition = "kafka-" + 0
     val topic = topicPartition.split("-").head
Index: core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala	(working copy)
@@ -27,9 +27,9 @@
 import kafka.network._
 import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
 import kafka.cluster.Broker
-import kafka.server.{KafkaZooKeeper, KafkaApis, KafkaConfig}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
+import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -80,6 +80,7 @@
     // topic metadata request only requires 2 APIs from the log manager
     val logManager = EasyMock.createMock(classOf[LogManager])
     val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
+    val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(kafkaZookeeper.getZookeeperClient).andReturn(zkClient)
     EasyMock.expect(logManager.getServerConfig).andReturn(configs.head)
     EasyMock.replay(logManager)
@@ -94,7 +95,7 @@
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, kafkaZookeeper)
+    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
Index: core/src/test/scala/unit/kafka/integration/FetcherTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/integration/FetcherTest.scala	(working copy)
@@ -30,10 +30,11 @@
 import kafka.producer.{ProducerData, Producer}
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
+import kafka.admin.CreateTopicCommand
 
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
-  val numNodes = 2
+  val numNodes = 1
   val configs = 
     for(props <- TestUtils.createBrokerConfigs(numNodes))
       yield new KafkaConfig(props)
@@ -54,6 +55,7 @@
 
   override def setUp() {
     super.setUp
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
     fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
     fetcher.stopConnectionsToAllBrokers
     fetcher.startConnections(topicInfos, cluster, null)
@@ -69,11 +71,9 @@
     var count = sendMessages(perNode)
     waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     fetch(count)
-    Thread.sleep(100)
     assertQueueEmpty()
     count = sendMessages(perNode)
     fetch(count)
-    Thread.sleep(100)
     assertQueueEmpty()
   }
   
Index: core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala	(working copy)
@@ -18,8 +18,6 @@
 package kafka.integration
 
 import junit.framework.Assert._
-import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.atomic.AtomicInteger
 import kafka.utils.{ZKGroupTopicDirs, Logging}
 import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
 import kafka.server._
@@ -61,10 +59,10 @@
   def testResetToEarliestWhenOffsetTooLow() =
     assertEquals(NumMessages, resetAndConsume(NumMessages, "smallest", SmallOffset))
     
-  def testResetToLatestWhenOffsetTooHigh() = 
+  def testResetToLatestWhenOffsetTooHigh() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", LargeOffset))
-    
-  def testResetToLatestWhenOffsetTooLow() = 
+
+  def testResetToLatestWhenOffsetTooLow() =
     assertEquals(0, resetAndConsume(NumMessages, "largest", SmallOffset))
   
   /* Produce the given number of messages, create a consumer with the given offset policy, 
Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -17,23 +17,22 @@
 
 package kafka.integration
 
-import java.io.File
 import java.nio.ByteBuffer
-import java.util.Properties
 import junit.framework.Assert._
-import kafka.admin.CreateTopicCommand
 import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
-import kafka.common.{FetchRequestFormatException, OffsetOutOfRangeException, InvalidPartitionException}
-import kafka.message.Message
+import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import java.util.Properties
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
 import kafka.serializer.StringDecoder
-import kafka.server.{KafkaRequestHandler, KafkaConfig}
+import kafka.message.Message
 import kafka.utils.{TestZKUtils, TestUtils}
 import org.apache.log4j.{Level, Logger}
 import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
+import kafka.admin.CreateTopicCommand
+import kafka.common.{InvalidPartitionException, NotLeaderForPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -93,7 +92,7 @@
       case e: FetchRequestFormatException => "success"
     }
   }
-  
+
   def testDefaultEncoderProducerAndFetch() {
     val topic = "test-topic"
     val props = new Properties()
@@ -158,7 +157,7 @@
         messages += topic -> messageList
         producer.send(producerData)
         builder.addFetch(topic, partition, 0, 10000)
-      }
+    }
 
       // wait a bit for produced message to be available
       Thread.sleep(700)
@@ -199,8 +198,9 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
+        for( (topic, partition) <- topics) {
           response.messageSet(topic, -1).iterator
+        }
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: InvalidPartitionException => "this is good"
@@ -332,10 +332,10 @@
 
   def testConsumerNotExistTopic() {
     val newTopic = "new-topic"
+    CreateTopicCommand.createTopic(zkClient, newTopic, 1, 1, config.brokerId.toString)
+    Thread.sleep(200)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
-    val logFile = new File(config.logDir, newTopic + "-0")
-    assertTrue(!logFile.exists)
   }
 
   /**
Index: core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala	(working copy)
@@ -27,6 +27,7 @@
 
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
 
 class BackwardsCompatibilityTest extends JUnit3Suite with KafkaServerTestHarness {
 
@@ -59,6 +60,8 @@
 
   // test for reading data with magic byte 0
   def testProtocolVersion0() {
+    CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
+    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
     var fetchOffset: Long = 0L
     var messageCount: Int = 0
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -157,7 +157,7 @@
     // stop IO threads and request handling, but leave networking operational
     // any requests should be accepted and queue up, but not handled
     server.requestHandlerPool.shutdown()
-    
+
     val t1 = SystemTime.milliseconds
     try {
       val response2 = producer.send(request)
Index: core/src/test/scala/unit/kafka/producer/ProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala	(working copy)
@@ -91,32 +91,30 @@
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
-
+    // create topic with 1 partition
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
     val producer = new Producer[String, String](config)
     try {
-      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0, but
-      // since partition 0 can exist on any of the two brokers, we need to fetch from both brokers
+      // Available partition ids should be 0.
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(100)
       producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      Thread.sleep(1000)
-      // cross check if one of the brokers got the messages
-      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", (messageSet1.hasNext || messageSet2.hasNext))
+      // get the leader
+      val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
+      assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
+      val leader = leaderOpt.get
 
-      if(messageSet1.hasNext) {
-        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
-        assertTrue("Message set should have 1 message", messageSet1.hasNext)
-        assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+      val messageSet = if(leader == server1.config.brokerId) {
+        val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+        response1.messageSet("new-topic", 0).iterator
+      }else {
+        val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+        response2.messageSet("new-topic", 0).iterator
       }
-      else {
-        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
-        assertTrue("Message set should have 1 message", messageSet2.hasNext)
-        assertEquals(new Message("test1".getBytes), messageSet2.next.message)
-      }
+      assertTrue("Message set should have 1 message", messageSet.hasNext)
+
+      assertEquals(new Message("test1".getBytes), messageSet.next.message)
+      assertTrue("Message set should have 1 message", messageSet.hasNext)
+      assertEquals(new Message("test1".getBytes), messageSet.next.message)
     } catch {
       case e: Exception => fail("Not expected", e)
     } finally {
@@ -178,65 +176,6 @@
   }
 
   @Test
-  def testZKSendToExistingTopicWithNoBrokers() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
-
-    val config = new ProducerConfig(props)
-
-    val producer = new Producer[String, String](config)
-    var server: KafkaServer = null
-
-    // create topic
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-
-    try {
-      // Available partition ids should be 0, 1, 2 and 3. The data in both cases should get sent to partition 0 and
-      // all partitions have broker 0 as the leader.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-      // cross check if brokers got the messages
-      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet1 = response1.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet1.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet1.next.message)
-
-      // shutdown server2
-      server2.shutdown
-      server2.awaitShutdown()
-      Thread.sleep(100)
-      // delete the new-topic logs
-      Utils.rm(server2.config.logDir)
-      Thread.sleep(100)
-      // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
-      val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
-      val config2 = new KafkaConfig(props2) {
-        override val numPartitions = 4
-      }
-      server = TestUtils.createServer(config2)
-      Thread.sleep(100)
-
-      // now there are no brokers registered under test-topic.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
-      Thread.sleep(100)
-
-      // cross check if brokers got the messages
-      val response2 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-      val messageSet2 = response2.messageSet("new-topic", 0).iterator
-      assertTrue("Message set should have 1 message", messageSet2.hasNext)
-      assertEquals(new Message("test".getBytes), messageSet2.next.message)
-
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    } finally {
-      if(server != null) server.shutdown
-      producer.close
-    }
-  }
-
-  @Test
   def testAsyncSendCanCorrectlyFailWithTimeout() {
     val timeoutMs = 500
     val props = new Properties()
Index: core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala	(working copy)
@@ -51,6 +51,7 @@
     val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
 
     servers ++= List(server1, server2)
+    try {
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -59,7 +60,7 @@
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -68,7 +69,7 @@
     servers.head.shutdown()
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     Thread.sleep(zookeeper.tickTime)
@@ -85,9 +86,12 @@
 
     // test if the leader is the preferred replica
     assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+    }finally {
     // shutdown the servers and delete data hosted on them
     servers.map(server => server.shutdown())
     servers.map(server => Utils.rm(server.config.logDir))
+
+    }
   }
 
   // Assuming leader election happens correctly, test if epoch changes as expected
@@ -105,17 +109,17 @@
 
       var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
       assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
-      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
 
       ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
       newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
       assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
-      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
 
     }finally {
       TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
Index: core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala	(revision 0)
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import collection.mutable.HashMap
+import collection.mutable.Map
+import kafka.cluster.{Partition, Replica}
+import org.easymock.EasyMock
+import kafka.log.Log
+import kafka.utils.{Time, MockTime, TestUtils}
+import org.junit.Assert._
+import org.I0Itec.zkclient.ZkClient
+
+class ISRExpirationTest extends JUnit3Suite {
+
+  var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val keepInSyncTimeMs = 100L
+    override val keepInSyncBytes = 10L
+  })
+  val topic = "foo"
+
+  def testISRExpirationForStuckFollowers() {
+    val time = new MockTime
+    // create leader replica
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
+    EasyMock.expect(log.setHW(5L)).times(1)
+    EasyMock.replay(log)
+
+    // add one partition
+    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 5L)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    // set remote replicas leo to something low, like 2
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
+
+    time.sleep(150)
+    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+
+    var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+    // add all replicas back to the ISR
+    partition0.inSyncReplicas ++= partition0.assignedReplicas()
+    assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+
+    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    // let the follower catch up only upto 3
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
+    time.sleep(150)
+    // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
+    // pulled any data for > keepInSyncTimeMs ms. So it is stuck
+    partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+    EasyMock.verify(log)
+  }
+
+  def testISRExpirationForSlowFollowers() {
+    val time = new MockTime
+    // create leader replica
+    val log = getLogWithHW(15L)
+    // add one partition
+    val partition0 = getPartitionWithAllReplicasInISR(topic, 0, time, configs.head.brokerId, log, 15L)
+    assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+    // set remote replicas leo to something low, like 4
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4))
+
+    time.sleep(150)
+    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    time.sleep(10)
+    (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+
+    val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+    assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+    EasyMock.verify(log)
+  }
+
+  def testISRExpirationForMultiplePartitions() {
+    val time = new MockTime
+    // mock zkclient
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+    // create replica manager
+    val replicaManager = new ReplicaManager(configs.head, time, zkClient)
+    try {
+      val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+      // create leader log
+      val log0 = getLogWithHW(5L)
+
+      // create leader and follower replicas
+      val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
+      val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
+
+      partition0.inSyncReplicas = Set(followerReplicaPartition0, leaderReplicaPartition0)
+      // set the leader and its hw and the hw update time
+      partition0.leaderId(Some(configs.head.brokerId))
+      partition0.leaderHW(Some(5L))
+
+      // set the leo for non-leader replicas to something low
+      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
+
+      val log1 = getLogWithHW(15L)
+      // create leader and follower replicas for partition 1
+      val partition1 = replicaManager.getOrCreatePartition(topic, 1, configs.map(_.brokerId).toSet)
+      val leaderReplicaPartition1 = replicaManager.addLocalReplica(topic, 1, log1, configs.map(_.brokerId).toSet)
+      val followerReplicaPartition1 = replicaManager.addRemoteReplica(topic, 1, configs.last.brokerId, partition0)
+
+      partition1.inSyncReplicas = Set(followerReplicaPartition1, leaderReplicaPartition1)
+      // set the leader and its hw and the hw update time
+      partition1.leaderId(Some(configs.head.brokerId))
+      partition1.leaderHW(Some(15L))
+
+      // set the leo for non-leader replicas to something low
+      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4))
+
+      time.sleep(150)
+      leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds))
+      leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds))
+      time.sleep(10)
+      (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+
+      val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+      assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
+
+      val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+      assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId))
+
+      EasyMock.verify(log0)
+      EasyMock.verify(log1)
+    }catch {
+      case e => e.printStackTrace()
+    }finally {
+      replicaManager.close()
+    }
+  }
+
+  private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
+                                               localLog: Log, leaderHW: Long): Partition = {
+    val partition = new Partition(topic, partitionId, time)
+    val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
+
+    val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
+    partition.assignedReplicas(Some(allReplicas.toSet))
+    // set in sync replicas for this partition to all the assigned replicas
+    partition.inSyncReplicas = allReplicas.toSet
+    // set the leader and its hw and the hw update time
+    partition.leaderId(Some(leaderId))
+    partition.leaderHW(Some(leaderHW))
+    partition
+  }
+
+  private def getLogWithHW(hw: Long): Log = {
+    val log1 = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
+    EasyMock.expect(log1.setHW(hw)).times(1)
+    EasyMock.replay(log1)
+
+    log1
+  }
+
+  private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
+    configs.filter(_.brokerId != leaderId).map { config =>
+      new Replica(config.brokerId, partition, topic)
+    }
+  }
+}
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala	(working copy)
@@ -20,7 +20,6 @@
 import scala.collection._
 import org.junit.{After, Before, Test}
 import junit.framework.Assert._
-import kafka.server._
 import kafka.message._
 import kafka.api._
 import kafka.utils.TestUtils
Index: core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala	(revision 0)
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import kafka.producer.ProducerData
+import kafka.serializer.StringEncoder
+import kafka.admin.CreateTopicCommand
+import kafka.cluster.{Replica, Partition, Broker}
+import kafka.utils.{MockTime, TestUtils}
+import junit.framework.Assert._
+import java.io.File
+import kafka.log.Log
+
+class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
+  val props = createBrokerConfigs(2)
+  val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
+  var brokers: Seq[KafkaServer] = null
+  val topic = "foobar"
+
+  override def setUp() {
+    super.setUp()
+    brokers = configs.map(config => TestUtils.createServer(config))
+  }
+
+  override def tearDown() {
+    super.tearDown()
+    brokers.foreach(_.shutdown())
+  }
+
+  def testReplicaFetcherThread() {
+    val partition = 0
+    val testMessageList = List("test1", "test2", "test3", "test4")
+    val leaderBrokerId = configs.head.brokerId
+    val followerBrokerId = configs.last.brokerId
+    val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
+
+    // create a topic and partition
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+
+    // send test messages to leader
+    val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
+    producer.send(new ProducerData[String, String](topic, "test", testMessageList))
+
+    // create a tmp directory
+    val tmpLogDir = TestUtils.tempDir()
+    val replicaLogDir = new File(tmpLogDir, topic + "-" + partition)
+    replicaLogDir.mkdirs()
+    val replicaLog = new Log(replicaLogDir, 500, 500, false)
+
+    // create replica fetch thread
+    val time = new MockTime
+    val testPartition = new Partition(topic, partition, time)
+    testPartition.leaderId(Some(leaderBrokerId))
+    val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog))
+    val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last)
+
+    // start a replica fetch thread to the above broker
+    replicaFetchThread.start()
+
+    Thread.sleep(700)
+    replicaFetchThread.shutdown()
+
+    assertEquals(60L, testReplica.log.get.logEndOffset)
+    replicaLog.close()
+  }
+}
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala	(revision 1344101)
+++ core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala	(working copy)
@@ -43,7 +43,7 @@
 
   @Test
   def testPartition() {
-    assertTrue(new Partition("foo", 10) == new Partition("foo", 10))
-    assertTrue(new Partition("foo", 1) != new Partition("foo", 0))
+    assertTrue(new Partition("foo", 10).equals(new Partition("foo", 10)))
+    assertTrue(!new Partition("foo", 1).equals(new Partition("foo", 0)))
   }
 }
Index: core/src/test/resources/log4j.properties
===================================================================
--- core/src/test/resources/log4j.properties	(revision 1344101)
+++ core/src/test/resources/log4j.properties	(working copy)
@@ -18,7 +18,7 @@
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=INFO
+log4j.logger.kafka=ERROR
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
 log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
Index: core/src/main/scala/kafka/log/SegmentList.scala
===================================================================
--- core/src/main/scala/kafka/log/SegmentList.scala	(revision 1344101)
+++ core/src/main/scala/kafka/log/SegmentList.scala	(working copy)
@@ -72,8 +72,30 @@
     }
     deleted
   }
-  
+
   /**
+   * Delete the items from position newEnd until end of list
+   */
+  def truncLast(newEnd: Int): Seq[T] = {
+    if(newEnd >= contents.get().size-1)
+      throw new IllegalArgumentException("End index must be segment list size - 1");
+    var deleted: Array[T] = null
+    var done = false
+    while(!done) {
+      val curr = contents.get()
+      val newLength = newEnd + 1
+      val updated = new Array[T](newLength)
+      Array.copy(curr, 0, updated, 0, newLength)
+      if(contents.compareAndSet(curr, updated)) {
+        deleted = new Array[T](curr.length - newLength)
+        Array.copy(curr, newEnd + 1, deleted, 0, curr.length - newLength)
+        done = true
+      }
+    }
+    deleted
+  }
+
+  /**
    * Get a consistent view of the sequence
    */
   def view: Array[T] = contents.get()
Index: core/src/main/scala/kafka/log/LogStats.scala
===================================================================
--- core/src/main/scala/kafka/log/LogStats.scala	(revision 1344101)
+++ core/src/main/scala/kafka/log/LogStats.scala	(working copy)
@@ -36,7 +36,7 @@
   
   def getNumberOfSegments: Int = log.numberOfSegments
   
-  def getCurrentOffset: Long = log.getHighwaterMark
+  def getCurrentOffset: Long = log.highwaterMark
   
   def getNumAppendedMessages: Long = numCumulatedMessages.get
 
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1333546)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -17,17 +17,18 @@
 
 package kafka.log
 
-import java.util.concurrent.atomic._
+import kafka.api.OffsetRequest
+import java.io.{IOException, RandomAccessFile, File}
+import java.util.{Comparator, Collections, ArrayList}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
+import kafka.message.{MessageSet, InvalidMessageException, FileMessageSet}
+import kafka.utils._
 import java.text.NumberFormat
-import java.io._
-import kafka.message._
-import kafka.utils._
-import kafka.common._
-import kafka.api.OffsetRequest
-import java.util._
+import kafka.common.OffsetOutOfRangeException
 
-private[kafka] object Log {
+object Log {
   val FileSuffix = ".kafka"
+  val hwFileName = "highwatermark"
 
   /**
    * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
@@ -75,9 +76,9 @@
     nf.setMinimumIntegerDigits(20)
     nf.setMaximumFractionDigits(0)
     nf.setGroupingUsed(false)
-    nf.format(offset) + Log.FileSuffix
+    nf.format(offset) + FileSuffix
   }
-  
+
   def getEmptyOffsets(request: OffsetRequest): Array[Long] = {
     if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime)
       return Array(0L)
@@ -86,10 +87,11 @@
   }
 }
 
+
 /**
  * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
  */
-private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
   @volatile var deleted = false
   def size: Long = messageSet.highWaterMark
   override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
@@ -100,8 +102,11 @@
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
+private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean)
+  extends Logging {
 
+  import kafka.log.Log._
+
   /* A lock that guards all modifications to the log */
   private val lock = new Object
 
@@ -114,54 +119,60 @@
   /* The actual segments of the log */
   private[log] val segments: SegmentList[LogSegment] = loadSegments()
 
-  /* The name of this log */
-  val name  = dir.getName()
 
+  /* create the leader highwatermark file handle */
+  private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
+
+  private var hw: Long = 0
+
   private val logStats = new LogStats(this)
 
-  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
 
+  /* The name of this log */
+  def name  = dir.getName()
+
   /* Load the log segments from the log files on disk */
   private def loadSegments(): SegmentList[LogSegment] = {
     // open all the segments read-only
-    val accum = new ArrayList[LogSegment]
+    val logSegments = new ArrayList[LogSegment]
     val ls = dir.listFiles()
     if(ls != null) {
-      for(file <- ls if file.isFile && file.toString.endsWith(Log.FileSuffix)) {
+      for(file <- ls if file.isFile && file.toString.endsWith(FileSuffix)) {
         if(!file.canRead)
           throw new IOException("Could not read file " + file)
         val filename = file.getName()
-        val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
+        val start = filename.substring(0, filename.length - FileSuffix.length).toLong
         val messageSet = new FileMessageSet(file, false)
-        accum.add(new LogSegment(file, messageSet, start))
+        logSegments.add(new LogSegment(file, messageSet, start))
       }
     }
 
-    if(accum.size == 0) {
+    if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment
-      val newFile = new File(dir, Log.nameFromOffset(0))
+      val newFile = new File(dir, nameFromOffset(0))
       val set = new FileMessageSet(newFile, true)
-      accum.add(new LogSegment(newFile, set, 0))
+      logSegments.add(new LogSegment(newFile, set, 0))
     } else {
       // there is at least one existing segment, validate and recover them/it
       // sort segments into ascending order for fast searching
-      Collections.sort(accum, new Comparator[LogSegment] {
+      Collections.sort(logSegments, new Comparator[LogSegment] {
         def compare(s1: LogSegment, s2: LogSegment): Int = {
           if(s1.start == s2.start) 0
           else if(s1.start < s2.start) -1
           else 1
         }
       })
-      validateSegments(accum)
+      validateSegments(logSegments)
 
       //make the final section mutable and run recovery on it if necessary
-      val last = accum.remove(accum.size - 1)
+      val last = logSegments.remove(logSegments.size - 1)
       last.messageSet.close()
       info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
       val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
-      accum.add(mutable)
+      logSegments.add(mutable)
     }
-    new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
+    new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
   }
 
   /**
@@ -192,6 +203,8 @@
     lock synchronized {
       for(seg <- segments.view)
         seg.messageSet.close()
+      checkpointHW()
+      hwFile.close()
     }
   }
 
@@ -273,9 +286,14 @@
   /**
    *  get the current high watermark of the log
    */
-  def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark
+  def highwaterMark: Long = segments.view.last.messageSet.highWaterMark
 
   /**
+   *  get the offset of the last message in the log
+   */
+  def logEndOffset: Long = segments.view.last.messageSet.getEndOffset()
+
+  /**
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
@@ -290,7 +308,7 @@
     lock synchronized {
       val last = segments.view.last
       val newOffset = nextAppendOffset
-      val newFile = new File(dir, Log.nameFromOffset(newOffset))
+      val newFile = new File(dir, nameFromOffset(newOffset))
       debug("Rolling log '" + name + "' to " + newFile.getName())
       segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
     }
@@ -317,6 +335,7 @@
       segments.view.last.messageSet.flush()
       unflushed.set(0)
       lastflushedTime.set(System.currentTimeMillis)
+      checkpointHW()
      }
   }
 
@@ -359,8 +378,66 @@
     }
     ret
   }
- 
-  def getTopicName():String = {
+
+  /* Attemps to delete all provided segments from a log and returns how many it was able to */
+  def deleteSegments(segments: Seq[LogSegment]): Int = {
+    var total = 0
+    for(segment <- segments) {
+      info("Deleting log segment " + segment.file.getName() + " from " + name)
+      swallow(segment.messageSet.close())
+      if(!segment.file.delete()) {
+        warn("Delete failed.")
+      } else {
+        total += 1
+      }
+    }
+    total
+  }
+
+  def recoverUptoLastCheckpointedHW() {
+    if(hwFile.length() > 0) {
+      // read the last checkpointed hw from disk
+      hwFile.seek(0)
+      val lastKnownHW = hwFile.readLong()
+      // find the log segment that has this hw
+      val segmentToBeTruncated = segments.view.find(segment =>
+        lastKnownHW >= segment.start && lastKnownHW < segment.messageSet.getEndOffset())
+
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          val truncatedSegmentIndex = segments.view.indexOf(segment)
+          segments.truncLast(truncatedSegmentIndex)
+        case None =>
+      }
+
+      segmentToBeTruncated match {
+        case Some(segment) =>
+          segment.messageSet.truncateUpto(lastKnownHW)
+          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
+        case None =>
+          assert(lastKnownHW <= segments.view.last.messageSet.size,
+            "Last checkpointed hw cannot be greater than the latest message in the log")
+          error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
+            .format(lastKnownHW, segments.view.head.start, segments.view.last.messageSet.size))
+      }
+
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start >= lastKnownHW)
+
+    }else
+      info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
+  }
+
+  def setHW(latestLeaderHW: Long) {
+    hw = latestLeaderHW
+  }
+
+  def checkpointHW() {
+    hwFile.seek(0)
+    hwFile.writeLong(hw)
+    hwFile.getChannel.force(true)
+  }
+
+  def topicName():String = {
     name.substring(0, name.lastIndexOf("-"))
   }
 
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1344101)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -24,6 +24,7 @@
 import kafka.server.KafkaConfig
 import kafka.common.{InvalidTopicException, InvalidPartitionException}
 import kafka.api.OffsetRequest
+import kafka.log.Log._
 
 /**
  * The guy who creates and hands out logs
@@ -144,7 +145,7 @@
     val log = getLog(offsetRequest.topic, offsetRequest.partition)
     log match {
       case Some(l) => l.getOffsetsBefore(offsetRequest)
-      case None => Log.getEmptyOffsets(offsetRequest)
+      case None => getEmptyOffsets(offsetRequest)
     }
   }
 
@@ -190,28 +191,13 @@
     log
   }
 
-  /* Attemps to delete all provided segments from a log and returns how many it was able to */
-  private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = {
-    var total = 0
-    for(segment <- segments) {
-      info("Deleting log segment " + segment.file.getName() + " from " + log.name)
-      swallow(segment.messageSet.close())
-      if(!segment.file.delete()) {
-        warn("Delete failed.")
-      } else {
-        total += 1
-      }
-    }
-    total
-  }
-
   /* Runs through the log removing segments older than a certain age */
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
-    val topic = Utils.getTopicPartition(log.dir.getName)._1
+    val topic = Utils.getTopicPartition(log.name)._1
     val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
     val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
-    val total = deleteSegments(log, toBeDeleted)
+    val total = log.deleteSegments(toBeDeleted)
     total
   }
 
@@ -231,7 +217,7 @@
       }
     }
     val toBeDeleted = log.markDeletedWhile( shouldDelete )
-    val total = deleteSegments(log, toBeDeleted)
+    val total = log.deleteSegments(toBeDeleted)
     total
   }
 
@@ -292,16 +278,16 @@
       try{
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
         var logFlushInterval = config.defaultFlushIntervalMs
-        if(logFlushIntervalMap.contains(log.getTopicName))
-          logFlushInterval = logFlushIntervalMap(log.getTopicName)
-        debug(log.getTopicName + " flush interval  " + logFlushInterval +
+        if(logFlushIntervalMap.contains(log.topicName))
+          logFlushInterval = logFlushIntervalMap(log.topicName)
+        debug(log.topicName + " flush interval  " + logFlushInterval +
             " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
       }
       catch {
         case e =>
-          error("Error flushing topic " + log.getTopicName, e)
+          error("Error flushing topic " + log.topicName, e)
           e match {
             case _: IOException =>
               fatal("Halting due to unrecoverable I/O error while flushing logs: " + e.getMessage, e)
Index: core/src/main/scala/kafka/cluster/Replica.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Replica.scala	(revision 1344101)
+++ core/src/main/scala/kafka/cluster/Replica.scala	(working copy)
@@ -18,6 +18,123 @@
 package kafka.cluster
 
 import kafka.log.Log
+import kafka.server.{KafkaConfig, ReplicaFetcherThread}
+import java.lang.IllegalStateException
+import kafka.utils.Logging
 
-case class Replica(brokerId: Int, partition: Partition, topic: String,
-                   var log: Option[Log] = None, var hw: Long = -1, var leo: Long = -1, isLocal: Boolean = false)
\ No newline at end of file
+class Replica(val brokerId: Int,
+              val partition: Partition,
+              val topic: String,
+              var log: Option[Log] = None,
+              var leoUpdateTime: Long = -1L) extends Logging {
+  private var logEndOffset: Long = -1L
+  private var replicaFetcherThread: ReplicaFetcherThread = null
+
+  def logEndOffset(newLeo: Option[Long] = None): Long = {
+    isLocal match {
+      case true =>
+        newLeo match {
+          case Some(newOffset) => throw new IllegalStateException("Trying to set the leo %d for local log".format(newOffset))
+          case None => log.get.logEndOffset
+        }
+      case false =>
+        newLeo match {
+          case Some(newOffset) =>
+            logEndOffset = newOffset
+            logEndOffset
+          case None => logEndOffset
+        }
+    }
+  }
+
+  def logEndOffsetUpdateTime(time: Option[Long] = None): Long = {
+    time match {
+      case Some(t) =>
+        leoUpdateTime = t
+        leoUpdateTime
+      case None =>
+        leoUpdateTime
+    }
+  }
+
+  def isLocal: Boolean = {
+    log match {
+      case Some(l) => true
+      case None => false
+    }
+  }
+
+  def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = {
+    highwaterMarkOpt match {
+      case Some(highwaterMark) =>
+        isLocal match {
+          case true =>
+            trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
+                                                                                   brokerId, highwaterMark))
+            log.get.setHW(highwaterMark)
+            highwaterMark
+          case false => throw new IllegalStateException("Unable to set highwatermark for topic %s ".format(topic) +
+            "partition %d on broker %d, since there is no local log for this partition"
+              .format(partition.partitionId, brokerId))
+        }
+      case None =>
+        isLocal match {
+          case true =>
+            log.get.highwaterMark
+          case false => throw new IllegalStateException("Unable to get highwatermark for topic %s ".format(topic) +
+            "partition %d on broker %d, since there is no local log for this partition"
+              .format(partition.partitionId, brokerId))
+        }
+    }
+  }
+
+  def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) {
+    val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId)
+    replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config)
+    replicaFetcherThread.setDaemon(true)
+    replicaFetcherThread.start()
+  }
+
+  def stopReplicaFetcherThread() {
+    if(replicaFetcherThread != null) {
+      replicaFetcherThread.shutdown()
+      replicaFetcherThread = null
+    }
+  }
+
+  def getIfFollowerAndLeader(): (Boolean, Int) = {
+    replicaFetcherThread != null match {
+      case true => (true, replicaFetcherThread.getLeader().id)
+      case false => (false, -1)
+    }
+  }
+
+  def close() {
+    if(replicaFetcherThread != null)
+      replicaFetcherThread.shutdown()
+  }
+
+  override def equals(that: Any): Boolean = {
+    if(!(that.isInstanceOf[Replica]))
+      return false
+    val other = that.asInstanceOf[Replica]
+    if(topic.equals(other.topic) && brokerId == other.brokerId && partition.equals(other.partition))
+      return true
+    false
+  }
+
+  override def hashCode(): Int = {
+    31 + topic.hashCode() + 17*brokerId + partition.hashCode()
+  }
+
+
+  override def toString(): String = {
+    val replicaString = new StringBuilder
+    replicaString.append("ReplicaId: " + brokerId)
+    replicaString.append("; Topic: " + topic)
+    replicaString.append("; Partition: " + partition.toString)
+    replicaString.append("; isLocal: " + isLocal)
+    if(isLocal) replicaString.append("; Highwatermark: " + highWatermark())
+    replicaString.toString()
+  }
+}
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1344101)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -16,12 +16,168 @@
  */
 package kafka.cluster
 
+import kafka.common.NoLeaderForPartitionException
+import kafka.utils.{SystemTime, Time, Logging}
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.ZkUtils._
+import java.util.concurrent.locks.ReentrantLock
+import java.lang.IllegalStateException
+
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
- * TODO: Commit queue to be added as part of KAFKA-46. Add AR, ISR, CUR, RAR state maintenance as part of KAFKA-302
  */
-case class Partition(topic: String, val partId: Int, var leader: Option[Replica] = None,
-                     assignedReplicas: Set[Replica] = Set.empty[Replica],
-                     inSyncReplicas: Set[Replica] = Set.empty[Replica],
-                     catchUpReplicas: Set[Replica] = Set.empty[Replica],
-                     reassignedReplicas: Set[Replica] = Set.empty[Replica])
+class Partition(val topic: String,
+                val partitionId: Int,
+                time: Time = SystemTime,
+                var inSyncReplicas: Set[Replica] = Set.empty[Replica]) extends Logging {
+  private var leaderReplicaId: Option[Int] = None
+  private var assignedReplicas: Set[Replica] = Set.empty[Replica]
+  private var highWatermarkUpdateTime: Long = -1L
+  private val leaderISRUpdateLock = new ReentrantLock()
+
+  def leaderId(newLeader: Option[Int] = None): Option[Int] = {
+    try {
+      leaderISRUpdateLock.lock()
+      if(newLeader.isDefined) {
+        info("Updating leader for for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
+        leaderReplicaId = newLeader
+      }
+      leaderReplicaId
+    }finally {
+      leaderISRUpdateLock.unlock()
+    }
+  }
+
+  def assignedReplicas(replicas: Option[Set[Replica]] = None): Set[Replica] = {
+    replicas match {
+      case Some(ar) =>
+        assignedReplicas = ar
+      case None =>
+    }
+    assignedReplicas
+  }
+
+  def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
+
+  def addReplica(replica: Replica): Boolean = {
+    if(!assignedReplicas.contains(replica)) {
+      assignedReplicas += replica
+      true
+    }else false
+  }
+
+  def updateReplicaLEO(replica: Replica, leo: Long) {
+    replica.leoUpdateTime = time.milliseconds
+    replica.logEndOffset(Some(leo))
+    debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
+  }
+
+  def leaderReplica(): Replica = {
+    val leaderReplicaId = leaderId()
+    if(leaderReplicaId.isDefined) {
+      val leaderReplica = assignedReplicas().find(_.brokerId == leaderReplicaId.get)
+      if(leaderReplica.isDefined) leaderReplica.get
+      else throw new IllegalStateException("No replica for leader %d in the replica manager"
+        .format(leaderReplicaId.get))
+    }else
+      throw new NoLeaderForPartitionException("Leader for topic %s partition %d does not exist"
+        .format(topic, partitionId))
+  }
+
+  def leaderHW(newHw: Option[Long] = None): Long = {
+    newHw match {
+      case Some(highWatermark) =>
+        leaderReplica().highWatermark(newHw)
+        highWatermarkUpdateTime = time.milliseconds
+        highWatermark
+      case None =>
+        leaderReplica().highWatermark()
+    }
+  }
+
+  def hwUpdateTime: Long = highWatermarkUpdateTime
+
+  def getOutOfSyncReplicas(keepInSyncTimeMs: Long, keepInSyncBytes: Long): Set[Replica] = {
+    /**
+     * there are two cases that need to be handled here -
+     * 1. Stuck followers: If the leo of the replica is less than the leo of leader and the leo hasn't been updated
+     *                     for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
+     * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
+     *                     follower is not catching up and should be removed from the ISR
+    **/
+    // Case 1 above
+    val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset())
+    info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
+      possiblyStuckReplicas.map(_.brokerId).mkString(",")))
+    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime() < (time.milliseconds - keepInSyncTimeMs))
+    info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
+    val leader = leaderReplica()
+    // Case 2 above
+    val slowReplicas = inSyncReplicas.filter(r => (leader.logEndOffset() - r.logEndOffset()) > keepInSyncBytes)
+    info("Slow replicas for topic %s partition %d are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
+    stuckReplicas ++ slowReplicas
+  }
+
+  def updateISR(newISR: Set[Int], zkClientOpt: Option[ZkClient] = None) {
+    try {
+      leaderISRUpdateLock.lock()
+      zkClientOpt match {
+        case Some(zkClient) =>
+          // update ISR in ZK
+          updateISRInZk(newISR, zkClient)
+        case None =>
+      }
+      // update partition's ISR in cache
+      inSyncReplicas = newISR.map {r =>
+        getReplica(r) match {
+          case Some(replica) => replica
+          case None => throw new IllegalStateException("ISR update failed. No replica for id %d".format(r))
+        }
+      }
+      info("Updated ISR for for topic %s partition %d to %s in cache".format(topic, partitionId, newISR.mkString(",")))
+    }catch {
+      case e => throw new IllegalStateException("Failed to update ISR for topic %s ".format(topic) +
+        "partition %d to %s".format(partitionId, newISR.mkString(",")), e)
+    }finally {
+      leaderISRUpdateLock.unlock()
+    }
+  }
+
+  private def updateISRInZk(newISR: Set[Int], zkClient: ZkClient) = {
+    val replicaListAndEpochString = readDataMaybeNull(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString))
+    if(replicaListAndEpochString == null) {
+      throw new NoLeaderForPartitionException(("Illegal partition state. ISR cannot be updated for topic " +
+        "%s partition %d since leader and ISR does not exist in ZK".format(topic, partitionId)))
+    }
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      val epoch = replicasAndEpochInfo.last
+      updatePersistentPath(zkClient, getTopicPartitionInSyncPath(topic, partitionId.toString),
+        "%s;%s".format(newISR.mkString(","), epoch))
+      info("Updating ISR for for topic %s partition %d to %s in ZK".format(topic, partitionId, newISR.mkString(",")))
+    }
+  }
+
+  override def equals(that: Any): Boolean = {
+    if(!(that.isInstanceOf[Partition]))
+      return false
+    val other = that.asInstanceOf[Partition]
+    if(topic.equals(other.topic) && partitionId == other.partitionId)
+      return true
+    false
+  }
+
+  override def hashCode(): Int = {
+    31 + topic.hashCode() + 17*partitionId
+  }
+
+  override def toString(): String = {
+    val partitionString = new StringBuilder
+    partitionString.append("Topic: " + topic)
+    partitionString.append("; Partition: " + partitionId)
+    partitionString.append("; Leader: " + leaderId())
+    partitionString.append("; Assigned replicas: " + assignedReplicas().map(_.brokerId).mkString(","))
+    partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.toString()
+  }
+}
Index: core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
===================================================================
--- core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(revision 1344101)
+++ core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala	(working copy)
@@ -55,14 +55,14 @@
       m.leader match {
         case Some(leader) =>
           val leaderReplica = new Replica(leader.id, partition, topic)
-          partition.leader = Some(leaderReplica)
+          partition.leaderId(Some(leaderReplica.brokerId))
           debug("Topic %s partition %d has leader %d".format(topic, m.partitionId, leader.id))
           partition
         case None =>
           debug("Topic %s partition %d does not have a leader yet".format(topic, m.partitionId))
           partition
       }
-    }.sortWith((s, t) => s.partId < t.partId)
+    }.sortWith((s, t) => s.partitionId < t.partitionId)
   }
 
   /**
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1344101)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -98,11 +98,8 @@
       val partitionIndex = getPartition(event.getKey, totalNumPartitions)
       val brokerPartition = topicPartitionsList(partitionIndex)
 
-      val leaderBrokerId = brokerPartition.leader match {
-        case Some(leader) => leader.brokerId
-        case None => -1
-        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
-      }
+      // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+      val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
 
       var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
       ret.get(leaderBrokerId) match {
@@ -113,7 +110,7 @@
           ret.put(leaderBrokerId, dataPerBroker)
       }
 
-      val topicAndPartition = (event.getTopic, brokerPartition.partId)
+      val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
       var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
       dataPerBroker.get(topicAndPartition) match {
         case Some(element) =>
@@ -131,7 +128,7 @@
     debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
     val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic)
     debug("Broker partitions registered for topic: %s are %s"
-      .format(pd.getTopic, topicPartitionsList.map(p => p.partId).mkString(",")))
+      .format(pd.getTopic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
     val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
     topicPartitionsList
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1344101)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -146,6 +146,8 @@
     */
   def highWaterMark(): Long = setHighWaterMark.get()
 
+  def getEndOffset(): Long = offset + sizeInBytes()
+
   def checkMutable(): Unit = {
     if(!mutable)
       throw new IllegalStateException("Attempt to invoke mutation on immutable message set.")
@@ -208,7 +210,13 @@
     needRecover.set(false)    
     len - validUpTo
   }
-  
+
+  def truncateUpto(hw: Long) = {
+    channel.truncate(hw)
+    setSize.set(hw)
+    setHighWaterMark.set(hw)
+  }
+
   /**
    * Read, validate, and discard a single message, returning the next valid offset, and
    * the message being validated
Index: core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala
===================================================================
--- core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala	(revision 1344101)
+++ core/src/main/scala/kafka/common/NotLeaderForPartitionException.scala	(working copy)
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package kafka.common
 
 /**
- * Exception raised when broker receives a produce message for partition it does not lead
- * @param message - A more detailed and descriptive error message
+ * Thrown when a request is made for partition on a broker that is NOT a leader for that partition
  */
-class NotLeaderForPartitionException(message: String) extends Exception(message) {
+class NotLeaderForPartitionException(message: String) extends RuntimeException(message) {
   def this() = this(null)
-}
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/common/ErrorMapping.scala
===================================================================
--- core/src/main/scala/kafka/common/ErrorMapping.scala	(revision 1344101)
+++ core/src/main/scala/kafka/common/ErrorMapping.scala	(working copy)
@@ -20,6 +20,7 @@
 import kafka.message.InvalidMessageException
 import java.nio.ByteBuffer
 import java.lang.Throwable
+import scala.Predef._
 
 /**
  * A bi-directional mapping between error codes and exceptions x  
@@ -31,21 +32,23 @@
   val NoError = 0
   val OffsetOutOfRangeCode = 1
   val InvalidMessageCode = 2
-  val WrongPartitionCode = 3
+  val InvalidPartitionCode = 3
   val InvalidFetchSizeCode = 4
   val InvalidFetchRequestFormatCode = 5
-  val NoLeaderForPartitionCode = 6
-  val NotLeaderForPartitionCode = 7
+  val NotLeaderForPartitionCode = 6
+  val NoLeaderForPartitionCode = 7
+  val UnknownTopicCode = 8
 
   private val exceptionToCode = 
     Map[Class[Throwable], Int](
       classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
-      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
+      classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
-      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
-      classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode
+      classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
+      classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode
+//      classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
Index: core/src/main/scala/kafka/common/UnknownTopicException.scala
===================================================================
--- core/src/main/scala/kafka/common/UnknownTopicException.scala	(revision 0)
+++ core/src/main/scala/kafka/common/UnknownTopicException.scala	(revision 0)
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a request is made for a topic, that hasn't been created in a Kafka cluster
+ */
+class UnknownTopicException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/network/SocketServer.scala
===================================================================
--- core/src/main/scala/kafka/network/SocketServer.scala	(revision 1344101)
+++ core/src/main/scala/kafka/network/SocketServer.scala	(working copy)
@@ -64,6 +64,7 @@
    * Shutdown the socket server
    */
   def shutdown() = {
+    info("Shutting down socket server")
     acceptor.shutdown
     for(processor <- processors)
       processor.shutdown
Index: core/src/main/scala/kafka/utils/ZkUtils.scala
===================================================================
--- core/src/main/scala/kafka/utils/ZkUtils.scala	(revision 1344101)
+++ core/src/main/scala/kafka/utils/ZkUtils.scala	(working copy)
@@ -133,20 +133,21 @@
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
     try {
       // NOTE: first increment epoch, then become leader
       val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
       createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
         "%d;%d".format(brokerId, newEpoch))
       val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+      val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
       updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
-        "%s;%d".format(currentISR.mkString(","), newEpoch))
+        "%s;%d".format(updatedISR.mkString(","), newEpoch))
       info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
-      Some(newEpoch)
+      Some(newEpoch, updatedISR)
     } catch {
       case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
-      case oe => None
+      case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
     }
   }
 
@@ -161,7 +162,7 @@
 
     val newEpoch = epoch match {
       case Some(partitionEpoch) =>
-        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
+        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
         partitionEpoch + 1
       case None =>
         // this is the first time leader is elected for this partition. So set epoch to 1
Index: core/src/main/scala/kafka/utils/KafkaScheduler.scala
===================================================================
--- core/src/main/scala/kafka/utils/KafkaScheduler.scala	(revision 1344101)
+++ core/src/main/scala/kafka/utils/KafkaScheduler.scala	(working copy)
@@ -39,24 +39,24 @@
 
   def hasShutdown: Boolean = executor.isShutdown
 
-  private def checkIfExecutorHasStarted = {
+  private def ensureExecutorHasStarted = {
     if(executor == null)
       throw new IllegalStateException("Kafka scheduler has not been started")
   }
 
   def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = {
-    checkIfExecutorHasStarted
+    ensureExecutorHasStarted
     executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
   }
 
   def shutdownNow() {
-    checkIfExecutorHasStarted
+    ensureExecutorHasStarted
     executor.shutdownNow()
     info("Forcing shutdown of scheduler " + baseThreadName)
   }
 
   def shutdown() {
-    checkIfExecutorHasStarted
+    ensureExecutorHasStarted
     executor.shutdown()
     info("Shutdown scheduler " + baseThreadName)
   }

Property changes on: core/src/main/scala/kafka/server
___________________________________________________________________
Added: svn:ignore
   + FollowerReplica.scala


Index: core/src/main/scala/kafka/server/KafkaZooKeeper.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(revision 1344101)
+++ core/src/main/scala/kafka/server/KafkaZooKeeper.scala	(working copy)
@@ -17,14 +17,15 @@
 
 package kafka.server
 
-import java.lang.{Thread, IllegalStateException}
 import java.net.InetAddress
-import kafka.admin.AdminUtils
 import kafka.cluster.Replica
-import kafka.common.{NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import kafka.admin.AdminUtils
+import java.lang.{Thread, IllegalStateException}
+import collection.mutable.HashSet
+import kafka.common.{InvalidPartitionException, NoLeaderForPartitionException, NotLeaderForPartitionException, KafkaZookeeperClient}
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -33,8 +34,10 @@
  *
  */
 class KafkaZooKeeper(config: KafkaConfig,
-                     addReplicaCbk: (String, Int) => Replica,
-                     getReplicaCbk: (String, Int) => Option[Replica]) extends Logging {
+                     addReplicaCbk: (String, Int, Set[Int]) => Replica,
+                     getReplicaCbk: (String, Int) => Option[Replica],
+                     becomeLeader: (Replica, Seq[Int]) => Unit,
+                     becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
@@ -109,6 +112,12 @@
   }
 
   def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
+    // TODO: KAFKA-352first check if this topic exists in the cluster
+//    if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
+//      throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
+    // check if partition id is invalid
+    if(partition < 0)
+      throw new InvalidPartitionException("Partition %d is invalid".format(partition))
     ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
       case Some(leader) =>
         if(leader != config.brokerId)
@@ -179,8 +188,9 @@
   private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
     partitions.foreach { partition =>
       val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+      info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
       if(assignedReplicas.contains(config.brokerId)) {
-        val replica = addReplicaCbk(topic, partition)
+        val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
         startReplica(replica)
       } else
         warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
@@ -189,43 +199,114 @@
   }
 
   private def startReplica(replica: Replica) {
-    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partId, replica.brokerId))
-    replica.log match {
-      case Some(log) =>  // log is already started
-      case None =>
-      // TODO: Add log recovery upto the last checkpointed HW as part of KAFKA-46
-    }
-    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partId) match {
-      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
+    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId,
+      replica.brokerId))
+    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,
+        leader))
+        // check if this broker is the leader, if not, then become follower
+        if(leader != config.brokerId)
+          becomeFollower(replica, leader, zkClient)
       case None => // leader election
         leaderElection(replica)
     }
   }
 
   def leaderElection(replica: Replica) {
-    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partId))
+    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
     // read the AR list for replica.partition from ZK
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partId).map(r => r.toInt)
-    // TODO: read the ISR as part of KAFKA-302
-    if(assignedReplicas.contains(replica.brokerId)) {
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
+    val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
+    if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId,
+                       assignedReplicas, inSyncReplicas, liveBrokers)) {
+      info("Broker %d will participate in leader election for topic %s partition %d".format(config.brokerId, replica.topic,
+                                                                                           replica.partition.partitionId))
       // wait for some time if it is not the preferred replica
       try {
-        if(replica.brokerId != assignedReplicas.head)
-          Thread.sleep(config.preferredReplicaWaitTime)
+        if(replica.brokerId != assignedReplicas.head) {
+          // sleep only if the preferred replica is alive
+          if(liveBrokers.contains(assignedReplicas.head)) {
+            info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) +
+              "partition %d is alive. Waiting for %d ms to allow it to become leader"
+              .format(replica.partition.partitionId, config.preferredReplicaWaitTime))
+            Thread.sleep(config.preferredReplicaWaitTime)
+          }
+        }
       }catch {
         case e => // ignoring
       }
-      val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
-      newLeaderEpoch match {
-        case Some(epoch) =>
-          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
-          // TODO: Become leader as part of KAFKA-302
+      val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
+        replica.partition.partitionId, replica.brokerId)
+      newLeaderEpochAndISR match {
+        case Some(epochAndISR) =>
+          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic,
+            replica.partition.partitionId))
+          info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId,
+                                                                    epochAndISR._2.mkString(",")))
+          becomeLeader(replica, epochAndISR._2)
         case None =>
+          ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+            case Some(leader) =>
+              becomeFollower(replica, leader, zkClient)
+            case None =>
+              error("Lost leader for topic %s partition %d right after leader election".format(replica.topic,
+                replica.partition.partitionId))
+          }
       }
     }
   }
 
+  private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int],
+                              inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = {
+    // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
+    assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
+      " %s partition %d".format(topic, partition))
+    inSyncReplicas.size > 0 match {
+      case true => // check if this broker is in the ISR. If yes, return true
+        inSyncReplicas.contains(brokerId) match {
+          case true =>
+            info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) +
+              " for topic %s partition %d".format(topic, partition))
+            true
+          case false =>
+            // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR
+            val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r))
+            liveBrokersInISR.isEmpty match {
+              case true =>
+                if(assignedReplicas.contains(brokerId)) {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  true
+                }else {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  false
+                }
+              case false =>
+                info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d "
+                  .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " +
+                  "in the ISR")
+                false  // let one of the live brokers in the ISR become the leader
+            }
+        }
+      case false =>
+        if(assignedReplicas.contains(brokerId)) {
+          info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
+            .format(topic, partition, brokerId) + "is part of the assigned replicas list")
+          true
+        }else {
+          info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
+            .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
+          false
+        }
+    }
+  }
+
   class TopicChangeListener extends IZkChildListener with Logging {
+    private val allTopics = new HashSet[String]()
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
@@ -233,12 +314,16 @@
         debug("Topic/partition change listener fired for path " + parentPath)
         import scala.collection.JavaConversions._
         val currentChildren = asBuffer(curChilds)
+        allTopics.clear()
         // check if topic has changed or a partition for an existing topic has changed
         if(parentPath == ZkUtils.BrokerTopicsPath) {
           val currentTopics = currentChildren
           debug("New topics " + currentTopics.mkString(","))
           // for each new topic [topic], watch the path /brokers/topics/[topic]/partitions
-          currentTopics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this))
+          currentTopics.foreach { topic =>
+            zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this)
+            allTopics += topic
+          }
           handleNewTopics(currentTopics)
         }else {
           val topic = parentPath.split("/").takeRight(2).head
@@ -248,6 +333,10 @@
         }
       }
     }
+
+    def doesTopicExistInCluster(topic: String): Boolean = {
+      allTopics.contains(topic)
+    }
   }
 
   private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
@@ -275,9 +364,19 @@
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       // handle leader change event for path
-      val newLeader: String = data.asInstanceOf[String]
-      debug("Leader change listener fired for path %s. New leader is %s".format(dataPath, newLeader))
-      // TODO: update the leader in the list of replicas maintained by the log manager
+      val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
+      val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
+      val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
+      debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch))
+      val topicPartitionInfo = dataPath.split("/")
+      val topic = topicPartitionInfo.takeRight(4).head
+      val partition = topicPartitionInfo.takeRight(2).head.toInt
+      info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
+      val replica = getReplicaCbk(topic, partition).getOrElse(null)
+      assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
+        .format(topic, partition, config.brokerId))
+      replica.partition.leaderId(Some(newLeader))
+      assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
     }
 
     @throws(classOf[Exception])
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1344101)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -20,6 +20,7 @@
 import java.util.Properties
 import kafka.utils.{Utils, ZKConfig}
 import kafka.message.Message
+import kafka.consumer.ConsumerConfig
 
 /**
  * Configuration settings for the kafka server
@@ -105,7 +106,27 @@
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
+  val keepInSyncTimeMs = Utils.getLong(props, "isr.in.sync.time.ms", 30000)
+
+  val keepInSyncBytes = Utils.getLong(props, "isr.in.sync.bytes", 4000)
+
   /* size of the state change request queue in Zookeeper */
   val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
 
+  /**
+   * Config options relevant to a follower for a replica
+   */
+  /** the socket timeout for network requests */
+  val replicaSocketTimeoutMs = Utils.getInt(props, "replica.socket.timeout.ms", ConsumerConfig.SocketTimeout)
+
+  /** the socket receive buffer for network requests */
+  val replicaSocketBufferSize = Utils.getInt(props, "replica.socket.buffersize", ConsumerConfig.SocketBufferSize)
+
+  /** the number of byes of messages to attempt to fetch */
+  val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize)
+
+  val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
+
+  val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
+
  }
Index: core/src/main/scala/kafka/server/KafkaRequestHandler.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(revision 1344101)
+++ core/src/main/scala/kafka/server/KafkaRequestHandler.scala	(working copy)
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import org.apache.log4j._
 import kafka.network._
 import kafka.utils._
 
@@ -42,7 +41,7 @@
 
 class KafkaRequestHandlerPool(val requestChannel: RequestChannel, 
                               val apis: KafkaApis, 
-                              numThreads: Int) { 
+                              numThreads: Int) extends Logging {
   
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
@@ -53,10 +52,12 @@
   }
   
   def shutdown() {
+    info("Shutting down request handlers")
     for(handler <- runnables)
       handler.shutdown
     for(thread <- threads)
       thread.join
+    info("Request handlers shut down")
   }
   
 }
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1344101)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -21,15 +21,16 @@
 import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
-import kafka.cluster.Replica
 import java.util.concurrent._
 import atomic.AtomicBoolean
+import kafka.cluster.Replica
+import org.I0Itec.zkclient.ZkClient
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
  * to start up and shutdown a single Kafka node.
  */
-class KafkaServer(val config: KafkaConfig) extends Logging {
+class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
@@ -39,7 +40,8 @@
   var requestHandlerPool: KafkaRequestHandlerPool = null
   private var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
-  val replicaManager = new ReplicaManager(config)
+  private var replicaManager: ReplicaManager = null
+  private var apis: KafkaApis = null
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
@@ -68,9 +70,11 @@
                                     config.maxSocketRequestSize)
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica)
+    kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, makeLeader, makeFollower)
 
-    val apis = new KafkaApis(socketServer.requestChannel, logManager, kafkaZookeeper)
+    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient)
+
+    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
     socketServer.startup
 
@@ -84,6 +88,7 @@
 
     // starting relevant replicas and leader election for partitions assigned to this broker
     kafkaZookeeper.startup
+
     info("Server started.")
   }
   
@@ -95,7 +100,9 @@
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("Shutting down Kafka server with id " + config.brokerId)
-      kafkaZookeeper.close
+      apis.close()
+      if(replicaManager != null)
+        replicaManager.close()
       if (socketServer != null)
         socketServer.shutdown()
       if(requestHandlerPool != null)
@@ -103,6 +110,7 @@
       Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
         logManager.close()
+      kafkaZookeeper.close
 
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
@@ -117,14 +125,24 @@
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
-  def addReplica(topic: String, partition: Int): Replica = {
+  def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
+    info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
     // get local log
     val log = logManager.getOrCreateLog(topic, partition)
-    replicaManager.addLocalReplica(topic, partition, log)
+    replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
   }
 
-  def getReplica(topic: String, partition: Int): Option[Replica] = replicaManager.getReplica(topic, partition)
+  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    replicaManager.makeLeader(replica, currentISRInZk)
+  }
 
+  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+    replicaManager.makeFollower(replica, leaderBrokerId, zkClient)
+  }
+
+  def getReplica(topic: String, partition: Int): Option[Replica] =
+    replicaManager.getReplica(topic, partition)
+
   def getLogManager(): LogManager = logManager
 
   def getStats(): SocketServerStats = socketServer.stats
Index: core/src/main/scala/kafka/server/RequestPurgatory.scala
===================================================================
--- core/src/main/scala/kafka/server/RequestPurgatory.scala	(revision 1344101)
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala	(working copy)
@@ -21,7 +21,6 @@
 import java.util.LinkedList
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import kafka.api._
 import kafka.network._
 import kafka.utils._
 
Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 0)
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala	(revision 0)
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+import kafka.api.FetchRequestBuilder
+import kafka.utils.Logging
+import kafka.cluster.{Broker, Replica}
+import kafka.consumer.SimpleConsumer
+
+class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig)
+  extends Thread(name) with Logging {
+  val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+  private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port,
+    config.replicaSocketTimeoutMs, config.replicaSocketBufferSize)
+
+  override def run() {
+    try {
+      info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId))
+      while(isRunning.get()) {
+        val builder = new FetchRequestBuilder().
+          clientId(name).
+          replicaId(replica.brokerId).
+          maxWait(config.replicaMaxWaitTimeMs).
+          minBytes(config.replicaMinBytes)
+
+        // TODO: KAFKA-339 Keep this simple single fetch for now. Change it to fancier multi fetch when message
+        // replication actually works
+        val fetchOffset = replica.logEndOffset()
+        trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d"
+          .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset))
+        builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.replicaFetchSize)
+
+        val fetchRequest = builder.build()
+        val response = replicaConsumer.fetch(fetchRequest)
+        // TODO: KAFKA-339 Check for error. Don't blindly read the messages
+        // append messages to local log
+        replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
+        // record the hw sent by the leader for this partition
+        val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw)
+        replica.highWatermark(Some(followerHighWatermark))
+        trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+          .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark()))
+      }
+    }catch {
+      case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name))
+      case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1)
+    }
+    shutdownComplete()
+  }
+
+  private def shutdownComplete() = {
+    replicaConsumer.close()
+    shutdownLatch.countDown
+  }
+
+  def getLeader(): Broker = leaderBroker
+
+  def shutdown() {
+    info("Shutting down replica fetcher thread")
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("Replica fetcher thread shutdown completed")
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1344101)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.io.IOException
-import java.lang.IllegalStateException
 import java.util.concurrent.atomic._
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
@@ -30,12 +29,14 @@
 import org.apache.log4j.Logger
 import scala.collection._
 import scala.math._
+import java.lang.IllegalStateException
 
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
-  
+class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
+                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
+
   private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -74,7 +75,7 @@
     }
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
-       val topicData = readMessageSets(fetchReq.fetch.offsetInfo)
+       val topicData = readMessageSets(fetchReq.fetch)
        val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
        requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }
@@ -93,10 +94,10 @@
       for(partitionData <- topicData.partitionData) {
         msgIndex += 1
         try {
-          // TODO: need to handle ack's here!  Will probably move to another method.
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages)
+          replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
           offsets(msgIndex) = log.nextAppendOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
@@ -127,18 +128,24 @@
 
     // validate the request
     try {
-      fetchRequest.validate()  
+      fetchRequest.validate()
     } catch {
       case e:FetchRequestFormatException =>
         val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
-        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.InvalidFetchRequestFormatCode), -1)
+        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response,
+          ErrorMapping.InvalidFetchRequestFormatCode), -1)
         requestChannel.sendResponse(channelResponse)
     }
-    
+
+    if(fetchRequest.replicaId != -1)
+      maybeUpdatePartitionHW(fetchRequest)
+
     // if there are enough bytes available right now we can answer the request, otherwise we have to punt
     val availableBytes = availableFetchBytes(fetchRequest)
     if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
-      val topicData = readMessageSets(fetchRequest.offsetInfo)
+      val topicData = readMessageSets(fetchRequest)
+      debug("Returning fetch response %s for fetch request with correlation id %d"
+        .format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     } else {
@@ -157,34 +164,71 @@
     for(offsetDetail <- fetchRequest.offsetInfo) {
       for(i <- 0 until offsetDetail.partitions.size) {
         try {
-          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i)) 
+          debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
+          val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
           val available = maybeLog match {
-            case Some(log) => max(0, log.getHighwaterMark - offsetDetail.offsets(i))
+            case Some(log) => max(0, log.highwaterMark - offsetDetail.offsets(i))
             case None => 0
           }
-    	  totalBytes += math.min(offsetDetail.fetchSizes(i), available)
+          totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
-          case e: InvalidPartitionException => 
+          case e: InvalidPartitionException =>
             info("Invalid partition " + offsetDetail.partitions(i) + "in fetch request from client '" + fetchRequest.clientId + "'")
         }
       }
     }
     totalBytes
   }
-  
+
+  private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
+    val offsets = fetchRequest.offsetInfo
+
+    for(offsetDetail <- offsets) {
+      val topic = offsetDetail.topic
+      val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
+      for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
+        replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset,
+          kafkaZookeeper.getZookeeperClient)
+      }
+    }
+  }
+
   /**
    * Read from all the offset details given and produce an array of topic datas
    */
-  private def readMessageSets(offsets: Seq[OffsetDetail]): Array[TopicData] = {
+  private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = {
+    val offsets = fetchRequest.offsetInfo
     val fetchedData = new mutable.ArrayBuffer[TopicData]()
+
     for(offsetDetail <- offsets) {
       val info = new mutable.ArrayBuffer[PartitionData]()
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
         val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
-          case Left(err) => new PartitionData(partition, err, offset, MessageSet.Empty)
-          case Right(messages) => new PartitionData(partition, ErrorMapping.NoError, offset, messages)
+          case Left(err) =>
+            fetchRequest.replicaId match {
+              case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+              case _ =>
+                new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+            }
+          case Right(messages) =>
+            val leaderReplicaOpt = replicaManager.getReplica(topic, partition, logManager.config.brokerId)
+            assert(leaderReplicaOpt.isDefined, "Leader replica for topic %s partition %d".format(topic, partition) +
+              " must exist on leader broker %d".format(logManager.config.brokerId))
+            val leaderReplica = leaderReplicaOpt.get
+            fetchRequest.replicaId match {
+              case -1 => // replica id value of -1 signifies a fetch request from an external client, not from one of the replicas
+                new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
+              case _ => // fetch request from a follower
+                val replicaOpt = replicaManager.getReplica(topic, partition, fetchRequest.replicaId)
+                assert(replicaOpt.isDefined, "No replica %d in replica manager on %d"
+                  .format(fetchRequest.replicaId, replicaManager.config.brokerId))
+                val replica = replicaOpt.get
+                debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+                  .format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+                new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
+            }
         }
         info.append(partitionInfo)
       }
@@ -192,13 +236,15 @@
     }
     fetchedData.toArray
   }
-  
+
   /**
    * Read from a single topic/partition at the given offset
    */
   private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
     var response: Either[Int, MessageSet] = null
     try {
+      // check if the current broker is the leader for the partitions
+      kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
       trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
       val log = logManager.getLog(topic, partition)
       response = Right(log match { case Some(l) => l.read(offset, maxSize) case None => MessageSet.Empty })
@@ -259,6 +305,10 @@
     info("Sending response for topic metadata request")
     requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1))
   }
+
+  def close() {
+    fetchRequestPurgatory.shutdown()
+  }
   
   /**
    * A delayed fetch request
@@ -285,7 +335,7 @@
      * When a request expires just answer it with whatever data is present
      */
     def expire(delayed: DelayedFetch) {
-      val topicData = readMessageSets(delayed.fetch.offsetInfo)
+      val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
     }
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1344101)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -18,47 +18,232 @@
 
 import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
-import kafka.utils.Logging
 import collection.mutable
+import java.lang.IllegalStateException
+import mutable.ListBuffer
+import org.I0Itec.zkclient.ZkClient
+import java.util.concurrent.locks.ReentrantLock
+import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
+import kafka.common.InvalidPartitionException
 
-class ReplicaManager(config: KafkaConfig) extends Logging {
+class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
 
-  private val replicas = new mutable.HashMap[(String, Int), Replica]()
+  private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
+  private var leaderReplicas = new ListBuffer[Partition]()
+  private val leaderReplicaLock = new ReentrantLock()
+  private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
+  // start ISR expiration thread
+  isrExpirationScheduler.startUp
+  isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
 
-  def addLocalReplica(topic: String, partitionId: Int, log: Log): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
-        r.log match {
+  def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
+    val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
+    val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+
+    val replicaOpt = partition.getReplica(config.brokerId)
+    replicaOpt match {
+      case Some(replica) =>
+        info("Changing remote replica %s into a local replica".format(replica.toString))
+        replica.log match {
           case None =>
-            r.log = Some(log)
-          case Some(l) => // nothing to do since log already exists
+            replica.log = Some(log)
+          case Some(log) => // nothing to do since log already exists
         }
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, Some(log), log.getHighwaterMark, log.maxSize, true)
-        replicas.put((topic, partitionId), replica)
-        info("Added local replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        partition.addReplica(localReplica)
     }
-    replicas.get((topic, partitionId)).get
+    val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
+    partition.assignedReplicas(Some(assignedReplicas))
+    // get the replica objects for the assigned replicas for this partition
+    info("Added local replica %d for topic %s partition %s on broker %d"
+      .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+    localReplica
   }
 
-  def addRemoteReplica(topic: String, partitionId: Int): Replica = {
-    val replica = replicas.get((topic, partitionId))
-    replica match {
-      case Some(r) =>
+  def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
+    val newPartition = allReplicas.contains((topic, partitionId))
+    newPartition match {
+      case true => // partition exists, do nothing
+        allReplicas.get((topic, partitionId)).get
+      case false => // create remote replicas for each replica id in assignedReplicas
+        val partition = new Partition(topic, partitionId, time)
+        allReplicas += (topic, partitionId) -> partition
+        (assignedReplicaIds - config.brokerId).foreach(
+          replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
+        partition
+    }
+  }
+
+  def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
+    val partitionOpt = allReplicas.get((topic, partitionId))
+    partitionOpt match {
+      case Some(partition) => partition
       case None =>
-        val partition = new Partition(topic, partitionId)
-        val replica = new Replica(config.brokerId, partition, topic, None, -1, -1, false)
-        replicas.put((topic, partitionId), replica)
-        info("Added remote replica for topic %s partition %s on broker %d"
-          .format(replica.topic, replica.partition.partId, replica.brokerId))
+        throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
+        .format(topic, partitionId, config.brokerId))
     }
-    replicas.get((topic, partitionId)).get
   }
 
-  def getReplica(topic: String, partitionId: Int): Option[Replica] = {
-    replicas.get((topic, partitionId))
+  def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
+    val remoteReplica = new Replica(replicaId, partition, topic)
+
+    val replicaAdded = partition.addReplica(remoteReplica)
+    if(replicaAdded)
+      info("Added remote replica %d for topic %s partition %s on broker %d"
+        .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
+    remoteReplica
   }
-}
\ No newline at end of file
+
+  def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        replicas.getReplica(replicaId)
+      case None =>
+        None
+    }
+  }
+
+  def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        Some(replicas.leaderReplica())
+      case None =>
+        throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " +
+      "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+    }
+  }
+
+  def getPartition(topic: String, partitionId: Int): Option[Partition] =
+    allReplicas.get((topic, partitionId))
+
+  def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+    // set the replica leo
+    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+    partition.updateReplicaLEO(replica, fetchOffset)
+  }
+
+  def maybeIncrementLeaderHW(replica: Replica) {
+    // set the replica leo
+    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+    // set the leader HW to min of the leo of all replicas
+    val allLeos = partition.inSyncReplicas.map(_.logEndOffset())
+    val newHw = allLeos.min
+    val oldHw = partition.leaderHW()
+    if(newHw > oldHw) {
+      debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
+      partition.leaderHW(Some(newHw))
+    }
+  }
+
+  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    // stop replica fetcher thread, if any
+    replica.stopReplicaFetcherThread()
+    // read and cache the ISR
+    replica.partition.leaderId(Some(replica.brokerId))
+    replica.partition.updateISR(currentISRInZk.toSet)
+    // also add this partition to the list of partitions for which the leader is the current broker
+    try {
+      leaderReplicaLock.lock()
+      leaderReplicas += replica.partition
+    }finally {
+      leaderReplicaLock.unlock()
+    }
+  }
+
+  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+    info("Broker %d becoming follower to leader %d for topic %s partition %d"
+      .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+    // remove this replica's partition from the ISR expiration queue
+    try {
+      leaderReplicaLock.lock()
+      leaderReplicas -= replica.partition
+    }finally {
+      leaderReplicaLock.unlock()
+    }
+    replica.log match {
+      case Some(log) =>  // log is already started
+        log.recoverUptoLastCheckpointedHW()
+      case None =>
+    }
+    // get leader for this replica
+    val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
+    val isReplicaAFollower = replica.getIfFollowerAndLeader()
+    // Become follower only if it is not already following the same leader
+    if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) {
+      // stop fetcher thread to previous leader
+      replica.stopReplicaFetcherThread()
+      // start fetcher thread to current leader
+      replica.startReplicaFetcherThread(leaderBroker, config)
+    }
+  }
+
+  def maybeShrinkISR(): Unit = {
+    try {
+      info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
+        .format(config.keepInSyncTimeMs))
+
+      leaderReplicaLock.lock()
+      leaderReplicas.foreach { partition =>
+         // shrink ISR if a follower is slow or stuck
+        val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.keepInSyncTimeMs, config.keepInSyncBytes)
+        if(outOfSyncReplicas.size > 0) {
+          val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
+          assert(newInSyncReplicas.size > 0)
+          info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId,
+            newInSyncReplicas.map(_.brokerId).mkString(",")))
+          // update ISR in zk and in memory
+          partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
+        }
+      }
+    }catch {
+      case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
+    }finally {
+      leaderReplicaLock.unlock()
+    }
+  }
+
+  def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
+    val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
+    if(partition.inSyncReplicas.contains(replica)) false
+    else if(partition.assignedReplicas().contains(replica)) {
+      val leaderHW = partition.leaderHW()
+      replica.logEndOffset() >= leaderHW
+    }
+    else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+      " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+  }
+
+  def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
+    val replicaOpt = getReplica(topic, partition, replicaId)
+    replicaOpt match {
+      case Some(replica) =>
+        updateReplicaLEO(replica, offset)
+        // check if this replica needs to be added to the ISR
+        if(checkIfISRCanBeExpanded(replica)) {
+          val newISR = replica.partition.inSyncReplicas + replica
+          // update ISR in ZK and cache
+          replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient))
+        }
+        maybeIncrementLeaderHW(replica)
+      case None =>
+        throw new IllegalStateException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
+    }
+  }
+
+  def recordLeaderLogUpdate(topic: String, partition: Int) = {
+    val replicaOpt = getReplica(topic, partition, config.brokerId)
+    replicaOpt match {
+      case Some(replica) =>
+        replica.logEndOffsetUpdateTime(Some(time.milliseconds))
+      case None =>
+        throw new IllegalStateException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
+    }
+  }
+
+  def close() {
+    isrExpirationScheduler.shutdown()
+    allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
+  }
+}
Index: core/src/main/scala/kafka/api/FetchRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchRequest.scala	(revision 1344101)
+++ core/src/main/scala/kafka/api/FetchRequest.scala	(working copy)
@@ -18,10 +18,10 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.FetchRequestFormatException
 import kafka.network.Request
 import kafka.utils.Utils
 import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
+import kafka.common.FetchRequestFormatException
 
 object OffsetDetail {
 
@@ -116,7 +116,8 @@
     var topics = Set[String]()
     val iter = offsetInfo.iterator
     while(iter.hasNext) {
-      val topic = iter.next.topic
+      val offsetData = iter.next()
+      val topic = offsetData.topic
       if(topics.contains(topic))
         throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic)
       else
Index: core/src/main/scala/kafka/api/FetchResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchResponse.scala	(revision 1344101)
+++ core/src/main/scala/kafka/api/FetchResponse.scala	(working copy)
@@ -29,18 +29,20 @@
     val partition = buffer.getInt
     val error = buffer.getInt
     val initialOffset = buffer.getLong
+    val hw = buffer.getLong()
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
   }
 }
 
-case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
-  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
+case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L,
+                         messages: MessageSet) {
+  val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8
 
-  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, messages)
+  def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
 
 }
 
@@ -117,6 +119,14 @@
     }
     messageSet.asInstanceOf[ByteBufferMessageSet]
   }
+
+  def highWatermark(topic: String, partition: Int): Long = {
+    topicMap.get(topic) match {
+      case Some(topicData) =>
+        TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
+      case None => -1L
+    }
+  }
 }
 
 // SENDS
@@ -125,10 +135,11 @@
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(20)
+  private val buffer = ByteBuffer.allocate(28)
   buffer.putInt(partitionData.partition)
   buffer.putInt(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
+  buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes.intValue())
   buffer.rewind()
 
Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
===================================================================
--- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(revision 1344101)
+++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(working copy)
@@ -272,7 +272,7 @@
         } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
             throw new IOException(_input + " current offset=" + _offset
                     + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.WrongPartitionCode()) {
+        } else if (errorCode == ErrorMapping.InvalidPartitionCode()) {
             throw new IOException(_input + " : wrong partition");
         } else if (errorCode != ErrorMapping.NoError()) {
             throw new IOException(_input + " current offset=" + _offset
