Index: core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala	(revision 1364025)
+++ core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala	(working copy)
@@ -40,7 +40,6 @@
     // create leader replica
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
-    EasyMock.expect(log.setHW(5L)).times(1)
     EasyMock.replay(log)
 
     // add one partition
@@ -48,10 +47,10 @@
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
     // set remote replicas leo to something low, like 2
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2))
 
     time.sleep(150)
-    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    leaderReplica.logEndOffset(Some(5L))
 
     var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -60,9 +59,9 @@
     partition0.inSyncReplicas ++= partition0.assignedReplicas()
     assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
 
-    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    leaderReplica.logEndOffset(Some(5L))
     // let the follower catch up only upto 3
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3))
     time.sleep(150)
     // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
     // pulled any data for > keepInSyncTimeMs ms. So it is stuck
@@ -80,12 +79,12 @@
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
     // set remote replicas leo to something low, like 4
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4))
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4))
 
     time.sleep(150)
-    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    leaderReplica.logEndOffset(Some(15L))
     time.sleep(10)
-    (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+    (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4)))
 
     val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -115,7 +114,7 @@
       partition0.leaderHW(Some(5L))
 
       // set the leo for non-leader replicas to something low
-      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
+      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2))
 
       val log1 = getLogWithHW(15L)
       // create leader and follower replicas for partition 1
@@ -129,13 +128,13 @@
       partition1.leaderHW(Some(15L))
 
       // set the leo for non-leader replicas to something low
-      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4))
+      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4))
 
       time.sleep(150)
-      leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds))
-      leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds))
+      leaderReplicaPartition0.logEndOffset(Some(4L))
+      leaderReplicaPartition1.logEndOffset(Some(4L))
       time.sleep(10)
-      (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+      (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L)))
 
       val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
       assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -155,9 +154,9 @@
   private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
                                                localLog: Log, leaderHW: Long): Partition = {
     val partition = new Partition(topic, partitionId, time)
-    val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
+    val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog))
 
-    val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
+    val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
     partition.assignedReplicas(Some(allReplicas.toSet))
     // set in sync replicas for this partition to all the assigned replicas
     partition.inSyncReplicas = allReplicas.toSet
@@ -170,15 +169,14 @@
   private def getLogWithHW(hw: Long): Log = {
     val log1 = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
-    EasyMock.expect(log1.setHW(hw)).times(1)
     EasyMock.replay(log1)
 
     log1
   }
 
-  private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
+  private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
     configs.filter(_.brokerId != leaderId).map { config =>
-      new Replica(config.brokerId, partition, topic)
+      new Replica(config.brokerId, partition, topic, time)
     }
   }
 }
\ No newline at end of file
Index: core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala	(revision 0)
+++ core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala	(revision 0)
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import kafka.log.Log
+import org.I0Itec.zkclient.ZkClient
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{TestUtils, MockTime}
+import org.easymock.EasyMock
+import org.junit.Assert._
+
+class HighwatermarkPersistenceTest extends JUnit3Suite {
+
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val defaultFlushIntervalMs = 100
+  })
+  val topic = "foo"
+
+  def testHighWatermarkPersistenceSinglePartition() {
+    // mock zkclient
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+    // create replica manager
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient)
+    replicaManager.startup()
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+    assertEquals(0L, fooPartition0Hw)
+    val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+    // create leader log
+    val log0 = getMockLog
+    // create leader and follower replicas
+    val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
+    val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+    assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+    try {
+      followerReplicaPartition0.highWatermark()
+      fail("Should fail with IllegalStateException")
+    }catch {
+      case e: IllegalStateException => // this is ok
+    }
+    // set the leader
+    partition0.leaderId(Some(leaderReplicaPartition0.brokerId))
+    // set the highwatermark for local replica
+    partition0.leaderHW(Some(5L))
+    // sleep until flush interval
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+    assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+    EasyMock.verify(zkClient)
+    EasyMock.verify(log0)
+  }
+
+  def testHighWatermarkPersistenceMultiplePartitions() {
+    val topic1 = "foo1"
+    val topic2 = "foo2"
+    // mock zkclient
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+    // create replica manager
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient)
+    replicaManager.startup()
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(0L, topic1Partition0Hw)
+    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet)
+    // create leader log
+    val topic1Log0 = getMockLog
+    // create leader and follower replicas
+    val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet)
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw)
+    // set the leader
+    topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId))
+    // set the highwatermark for local replica
+    topic1Partition0.leaderHW(Some(5L))
+    // sleep until flush interval
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark())
+    assertEquals(5L, topic1Partition0Hw)
+    // add another partition and set highwatermark
+    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet)
+    // create leader log
+    val topic2Log0 = getMockLog
+    // create leader and follower replicas
+    val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet)
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+    assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw)
+    // set the leader
+    topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId))
+    // set the highwatermark for local replica
+    topic2Partition0.leaderHW(Some(15L))
+    assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark())
+    // change the highwatermark for topic1
+    topic1Partition0.leaderHW(Some(10L))
+    assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark())
+    // sleep until flush interval
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    // verify checkpointed hw for topic 2
+    topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+    assertEquals(15L, topic2Partition0Hw)
+    // verify checkpointed hw for topic 1
+    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(10L, topic1Partition0Hw)
+    EasyMock.verify(zkClient)
+    EasyMock.verify(topic1Log0)
+    EasyMock.verify(topic2Log0)
+  }
+
+  private def getMockLog: Log = {
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.replay(log)
+    log
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1364025)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.api.OffsetRequest
-import java.io.{IOException, RandomAccessFile, File}
+import java.io.{IOException, File}
 import java.util.{Comparator, Collections, ArrayList}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
 import kafka.utils._
@@ -120,12 +120,6 @@
   /* The actual segments of the log */
   private[log] val segments: SegmentList[LogSegment] = loadSegments()
 
-
-  /* create the leader highwatermark file handle */
-  private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
-
-  private var hw: Long = 0
-
   private val logStats = new LogStats(this)
 
   Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
@@ -206,8 +200,6 @@
         info("Closing log segment " + seg.file.getAbsolutePath)
         seg.messageSet.close()
       }
-      checkpointHW()
-      hwFile.close()
     }
   }
 
@@ -361,7 +353,6 @@
       segments.view.last.messageSet.flush()
       unflushed.set(0)
       lastflushedTime.set(System.currentTimeMillis)
-      checkpointHW()
      }
   }
 
@@ -433,14 +424,10 @@
     total
   }
 
-  def recoverUptoLastCheckpointedHW() {
-    if(hwFile.length() > 0) {
-      // read the last checkpointed hw from disk
-      hwFile.seek(0)
-      val lastKnownHW = hwFile.readLong()
+  def truncateTo(targetOffset: Long) {
       // find the log segment that has this hw
       val segmentToBeTruncated = segments.view.find(segment =>
-        lastKnownHW >= segment.start && lastKnownHW < segment.messageSet.getEndOffset())
+        targetOffset >= segment.start && targetOffset < segment.messageSet.getEndOffset())
 
       segmentToBeTruncated match {
         case Some(segment) =>
@@ -451,36 +438,24 @@
 
       segmentToBeTruncated match {
         case Some(segment) =>
-          segment.messageSet.truncateUpto(lastKnownHW)
-          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
+          segment.messageSet.truncateTo(targetOffset)
+          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
         case None =>
-          assert(lastKnownHW <= segments.view.last.messageSet.size,
+          assert(targetOffset <= segments.view.last.messageSet.size,
             "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
-              format(lastKnownHW, segments.view.last.messageSet.size, segments.view.last.file.getAbsolutePath))
+              format(targetOffset, segments.view.last.messageSet.size, segments.view.last.file.getAbsolutePath))
           error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
-            .format(lastKnownHW, segments.view.head.start, segments.view.last.messageSet.size))
+            .format(targetOffset, segments.view.head.start, segments.view.last.messageSet.size))
       }
 
-      val segmentsToBeDeleted = segments.view.filter(segment => segment.start >= lastKnownHW)
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start >= targetOffset)
       if(segmentsToBeDeleted.size < segments.view.size) {
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
         error("Failed to delete some segments during log recovery")
       }
-    }else
-      info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
   }
 
-  def setHW(latestLeaderHW: Long) {
-    hw = latestLeaderHW
-  }
-
-  def checkpointHW() {
-    hwFile.seek(0)
-    hwFile.writeLong(hw)
-    hwFile.getChannel.force(true)
-  }
-
   def topicName():String = {
     name.substring(0, name.lastIndexOf("-"))
   }
Index: core/src/main/scala/kafka/cluster/Replica.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Replica.scala	(revision 1364025)
+++ core/src/main/scala/kafka/cluster/Replica.scala	(working copy)
@@ -19,42 +19,37 @@
 
 import kafka.log.Log
 import java.lang.IllegalStateException
-import kafka.utils.Logging
+import kafka.utils.{SystemTime, Time, Logging}
 
 class Replica(val brokerId: Int,
               val partition: Partition,
               val topic: String,
-              var log: Option[Log] = None,
-              var leoUpdateTime: Long = -1L) extends Logging {
+              time: Time = SystemTime,
+              var hw: Option[Long] = None,
+              var log: Option[Log] = None) extends Logging {
   private var logEndOffset: Long = -1L
+  private var logEndOffsetUpdateTimeMs: Long = -1L
 
   def logEndOffset(newLeo: Option[Long] = None): Long = {
     isLocal match {
       case true =>
         newLeo match {
-          case Some(newOffset) => throw new IllegalStateException("Trying to set the leo %d for local log".format(newOffset))
+          case Some(newOffset) => logEndOffsetUpdateTimeMs = time.milliseconds; newOffset
           case None => log.get.logEndOffset
         }
       case false =>
         newLeo match {
           case Some(newOffset) =>
             logEndOffset = newOffset
+            logEndOffsetUpdateTimeMs = time.milliseconds
+            trace("Setting log end offset for replica %d for topic %s partition %d to %d"
+              .format(brokerId, topic, partition.partitionId, logEndOffset))
             logEndOffset
           case None => logEndOffset
         }
     }
   }
 
-  def logEndOffsetUpdateTime(time: Option[Long] = None): Long = {
-    time match {
-      case Some(t) =>
-        leoUpdateTime = t
-        leoUpdateTime
-      case None =>
-        leoUpdateTime
-    }
-  }
-
   def isLocal: Boolean = {
     log match {
       case Some(l) => true
@@ -62,6 +57,8 @@
     }
   }
 
+  def logEndOffsetUpdateTime = logEndOffsetUpdateTimeMs
+
   def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = {
     highwaterMarkOpt match {
       case Some(highwaterMark) =>
@@ -69,7 +66,7 @@
           case true =>
             trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
                                                                                    brokerId, highwaterMark))
-            log.get.setHW(highwaterMark)
+            hw = Some(highwaterMark)
             highwaterMark
           case false => throw new IllegalStateException("Unable to set highwatermark for topic %s ".format(topic) +
             "partition %d on broker %d, since there is no local log for this partition"
@@ -78,7 +75,11 @@
       case None =>
         isLocal match {
           case true =>
-            log.get.highwaterMark
+            hw match {
+              case Some(highWatermarkValue) => highWatermarkValue
+              case None => throw new IllegalStateException("HighWatermark does not exist for topic %s ".format(topic) +
+              " partition %d on broker %d but local log exists".format(partition.partitionId, brokerId))
+            }
           case false => throw new IllegalStateException("Unable to get highwatermark for topic %s ".format(topic) +
             "partition %d on broker %d, since there is no local log for this partition"
               .format(partition.partitionId, brokerId))
Index: core/src/main/scala/kafka/cluster/Partition.scala
===================================================================
--- core/src/main/scala/kafka/cluster/Partition.scala	(revision 1364025)
+++ core/src/main/scala/kafka/cluster/Partition.scala	(working copy)
@@ -57,7 +57,9 @@
     assignedReplicas
   }
 
-  def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
+  def getReplica(replicaId: Int): Option[Replica] = {
+    assignedReplicas().find(_.brokerId == replicaId)
+  }
 
   def addReplica(replica: Replica): Boolean = {
     if(!assignedReplicas.contains(replica)) {
@@ -66,8 +68,7 @@
     }else false
   }
 
-  def updateReplicaLEO(replica: Replica, leo: Long) {
-    replica.leoUpdateTime = time.milliseconds
+  def updateReplicaLeo(replica: Replica, leo: Long) {
     replica.logEndOffset(Some(leo))
     debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
   }
@@ -109,7 +110,7 @@
     val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset())
     info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
       possiblyStuckReplicas.map(_.brokerId).mkString(",")))
-    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime() < (time.milliseconds - keepInSyncTimeMs))
+    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime < (time.milliseconds - keepInSyncTimeMs))
     info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     val leader = leaderReplica()
     // Case 2 above
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1364025)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -211,10 +211,10 @@
     len - validUpTo
   }
 
-  def truncateUpto(hw: Long) = {
-    channel.truncate(hw)
-    setSize.set(hw)
-    setHighWaterMark.set(hw)
+  def truncateTo(targetOffset: Long) = {
+    channel.truncate(targetOffset)
+    setSize.set(targetOffset)
+    setHighWaterMark.set(targetOffset)
   }
 
   /**
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1364025)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -85,6 +85,10 @@
 
     // starting relevant replicas and leader election for partitions assigned to this broker
     kafkaZookeeper.startup()
+    // start the replica manager
+    replicaManager.startup()
+
+    // start the controller
     kafkaController.startup()
 
     info("Server started.")
Index: core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
===================================================================
--- core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala	(revision 0)
+++ core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala	(revision 0)
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.io.RandomAccessFile
+import kafka.utils.Logging
+
+/**
+ * This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for
+ * all topics and partitions that this broker hosts. The format of this file is as follows -
+ * version                  (2 bytes)
+ * number of entries        (4 bytes)
+ * topic length             (2 bytes)
+ * topic                    (topic-length bytes)
+ * partition                (4 bytes)
+ * highwatermark            (8 bytes)
+*/
+
+object HighwaterMarkCheckpoint {
+  val highWatermarkFileName = "highwaterMark"
+  val currentHighwaterMarkFileVersion = 0
+}
+
+class HighwaterMarkCheckpoint(val path: String) extends Logging {
+  /* create the highwatermark file handle for all partitions */
+  private val hwFile = new RandomAccessFile(path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName, "rw")
+
+  def name: String = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName
+
+  def write(highwaterMarksPerPartition: Map[(String, Int), Long]) {
+    hwFile.synchronized {
+      hwFile.seek(0)
+      // checkpoint highwatermark for all partitions
+      // write the current version
+      hwFile.writeShort(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion)
+      // write the number of entries in the highwatermark file
+      hwFile.writeInt(highwaterMarksPerPartition.size)
+      highwaterMarksPerPartition.foreach { partitionAndHw =>
+        val topic = partitionAndHw._1._1
+        val partitionId = partitionAndHw._1._2
+        hwFile.writeUTF(topic)
+        hwFile.writeInt(partitionId)
+        hwFile.writeLong(partitionAndHw._2)
+      }
+      hwFile.getChannel.force(true)
+    }
+  }
+
+  def read(topic: String, partition: Int): Long = {
+    hwFile.synchronized {
+      hwFile.length() match {
+        case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+              "partition %d. Returning 0 as the highwatermark".format(partition))
+          0L
+        case _ =>
+          hwFile.seek(0)
+          val version = hwFile.readShort()
+          version match {
+            case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion =>
+              val numberOfHighWatermarks = hwFile.readInt()
+              val partitionHighWatermarks =
+                for(i <- 0 until numberOfHighWatermarks) yield ((hwFile.readUTF(), hwFile.readInt()) -> hwFile.readLong())
+              val hwOpt = partitionHighWatermarks.toMap.get((topic, partition))
+              hwOpt match {
+                case Some(hw) => hw
+                case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+                  "partition %d. Returning 0 as the highwatermark".format(partition))
+                0L
+              }
+            case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
+              System.exit(1)
+              -1L
+          }
+      }
+    }
+  }
+
+  def close() {
+    // flush to disk and close file handle
+    hwFile.close()
+  }
+}
\ No newline at end of file
Index: core/src/main/scala/kafka/server/ReplicaManager.scala
===================================================================
--- core/src/main/scala/kafka/server/ReplicaManager.scala	(revision 1364025)
+++ core/src/main/scala/kafka/server/ReplicaManager.scala	(working copy)
@@ -33,14 +33,23 @@
   private val leaderReplicaLock = new ReentrantLock()
   private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
   private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+  private val highWatermarkCheckpointScheduler = new KafkaScheduler(1, "highwatermark-checkpoint-thread", true)
+  private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
+  info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
 
-  // start ISR expiration thread
-  isrExpirationScheduler.startUp
-  isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
+  def startup() {
+    // start the highwatermark checkpoint thread
+    highWatermarkCheckpointScheduler.startUp
+    highWatermarkCheckpointScheduler.scheduleWithRate(checkpointHighwaterMarks, 0, config.defaultFlushIntervalMs)
+    // start ISR expiration thread
+    isrExpirationScheduler.startUp
+    isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
+  }
 
   def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
     val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
-    val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+    val localReplica = new Replica(config.brokerId, partition, topic, time,
+                                   Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
 
     val replicaOpt = partition.getReplica(config.brokerId)
     replicaOpt match {
@@ -87,7 +96,7 @@
   }
 
   def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
-    val remoteReplica = new Replica(replicaId, partition, topic)
+    val remoteReplica = new Replica(replicaId, partition, topic, time)
 
     val replicaAdded = partition.addReplica(remoteReplica)
     if(replicaAdded)
@@ -120,10 +129,10 @@
   def getPartition(topic: String, partitionId: Int): Option[Partition] =
     allReplicas.get((topic, partitionId))
 
-  def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+  def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
     // set the replica leo
     val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
-    partition.updateReplicaLEO(replica, fetchOffset)
+    partition.updateReplicaLeo(replica, fetchOffset)
   }
 
   def maybeIncrementLeaderHW(replica: Replica) {
@@ -166,7 +175,7 @@
     }
     replica.log match {
       case Some(log) =>  // log is already started
-        log.recoverUptoLastCheckpointedHW()
+        log.truncateTo(readCheckpointedHighWatermark(replica.topic, replica.partition.partitionId))
       case None =>
     }
     // get leader for this replica
@@ -223,7 +232,7 @@
     val replicaOpt = getReplica(topic, partition, replicaId)
     replicaOpt match {
       case Some(replica) =>
-        updateReplicaLEO(replica, offset)
+        updateReplicaLeo(replica, offset)
         // check if this replica needs to be added to the ISR
         if(checkIfISRCanBeExpanded(replica)) {
           val newISR = replica.partition.inSyncReplicas + replica
@@ -236,18 +245,47 @@
     }
   }
 
-  def recordLeaderLogUpdate(topic: String, partition: Int) = {
+  def recordLeaderLogUpdate(topic: String, partition: Int, logEndOffset: Long) = {
     val replicaOpt = getReplica(topic, partition, config.brokerId)
     replicaOpt match {
-      case Some(replica) =>
-        replica.logEndOffsetUpdateTime(Some(time.milliseconds))
+      case Some(replica) => replica.logEndOffset(Some(logEndOffset))
       case None =>
         throw new IllegalStateException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
     }
   }
 
+  /**
+   * Flushes the highwatermark value for all partitions to the highwatermark file
+   */
+  def checkpointHighwaterMarks() {
+    val highwaterMarksForAllPartitions = allReplicas.map { partition =>
+      val topic = partition._1._1
+      val partitionId = partition._1._2
+      val localReplicaOpt = partition._2.getReplica(config.brokerId)
+      val hw = localReplicaOpt match {
+        case Some(localReplica) => localReplica.highWatermark()
+        case None =>
+          error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
+              " Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
+          0L
+      }
+      (topic, partitionId) -> hw
+    }.toMap
+    highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
+    info("Checkpointed highwatermarks")
+  }
+
+  /**
+   * Reads the checkpointed highWatermarks for all partitions
+   * @returns checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0
+   */
+  def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
+
   def close() {
     isrExpirationScheduler.shutdown()
     replicaFetcherManager.shutdown()
+    highWatermarkCheckpointScheduler.shutdown()
+    checkpointHighwaterMarks()
+    highwaterMarkCheckpoint.close()
   }
 }
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1364025)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -171,7 +171,7 @@
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
-          replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
+          replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition, log.logEndOffset)
           offsets(msgIndex) = log.nextAppendOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
