Index: core/src/test/scala/unit/kafka/log/LogTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogTest.scala	(revision 1375368)
+++ core/src/test/scala/unit/kafka/log/LogTest.scala	(working copy)
@@ -22,7 +22,7 @@
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
-import kafka.utils.{Utils, TestUtils, Range}
+import kafka.utils.{Utils, TestUtils, Range, SystemTime}
 import kafka.common.OffsetOutOfRangeException
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 
@@ -48,14 +48,14 @@
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, false)
+    new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
   }
   
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, false)
+      new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: IllegalStateException => "This is good"
@@ -64,7 +64,7 @@
   
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -81,7 +81,7 @@
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -101,7 +101,7 @@
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, false)
+    val log = new Log(logDir, SystemTime, 100, 1000, 24 * 7 * 60 * 60 * 1000L, false)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -156,7 +156,7 @@
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, SystemTime, 100, 1000, 24 * 7 * 60 * 60 * 1000L, false)
       val curOffset = log.nextAppendOffset
       assertEquals(curOffset, 0)
 
@@ -169,7 +169,7 @@
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, SystemTime, 100, 1000, 24 * 7 * 60 * 60 * 1000L, false)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
Index: core/src/test/scala/other/kafka/TestLogPerformance.scala
===================================================================
--- core/src/test/scala/other/kafka/TestLogPerformance.scala	(revision 1375368)
+++ core/src/test/scala/other/kafka/TestLogPerformance.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.message._
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{TestUtils, Utils, SystemTime}
 
 object TestLogPerformance {
 
@@ -30,7 +30,7 @@
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, false)
+    val log = new Log(dir, SystemTime, 50*1024*1024, 5000000, 24 * 7 * 60 * 60 * 1000L, false)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1375368)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -101,7 +101,8 @@
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
+private[log] class Log(val dir: File, val time: Time, val maxSize: Long,
+                       val flushInterval: Int, val retentionMSInterval: Long, val needRecovery: Boolean) extends Logging {
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
@@ -120,7 +121,7 @@
 
   private val logStats = new LogStats(this)
 
-  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
 
   /* Load the log segments from the log files on disk */
   private def loadSegments(): SegmentList[LogSegment] = {
@@ -299,7 +300,8 @@
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
-    if(segment.messageSet.sizeInBytes > maxSize)
+    val currentMS = time.milliseconds
+    if((segment.messageSet.sizeInBytes > maxSize) || (currentMS - segment.file.lastModified > retentionMSInterval))
       roll()
   }
 
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1375368)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -49,8 +49,8 @@
   private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
+  private val logRetentionSize = config.logRetentionSize
   private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
-  private val logRetentionSize = config.logRetentionSize
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -67,7 +67,9 @@
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery)
+        val topic = Utils.getTopicPartition(dir.getName)._1
+        val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+        val log = new Log(dir, time, maxSize, flushInterval, logCleanupThresholdMS, needRecovery)
         val topicPartion = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartion._1)
@@ -146,7 +148,8 @@
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false)
+      val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+      new Log(d, time, maxSize, flushInterval, logCleanupThresholdMS, false)
     }
   }
   
Index: core/src/main/scala/kafka/utils/Utils.scala
===================================================================
--- core/src/main/scala/kafka/utils/Utils.scala	(revision 1375368)
+++ core/src/main/scala/kafka/utils/Utils.scala	(working copy)
@@ -595,7 +595,7 @@
     }
   }
 
-  def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
+  def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
     val successMsg =  "The retention hour for "
     getCSVMap(retentionHours, exceptionMsg, successMsg)
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1375368)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -68,7 +68,7 @@
   val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+  val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
