Index: core/src/test/scala/unit/kafka/log/LogTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogTest.scala	(revision 1375740)
+++ core/src/test/scala/unit/kafka/log/LogTest.scala	(working copy)
@@ -22,7 +22,7 @@
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
-import kafka.utils.{Utils, TestUtils, Range}
+import kafka.utils.{Utils, TestUtils, Range, SystemTime, MockTime}
 import kafka.common.OffsetOutOfRangeException
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 
@@ -44,18 +44,69 @@
     for(offset <- offsets)
       new File(dir, Integer.toString(offset) + Log.FileSuffix).createNewFile()
   }
-  
+
+  /** Test that the size and time based log segment rollout works. */
   @Test
+  def testTimeBasedLogRoll() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val rollMs = 1 * 60 * 60L
+    val time: MockTime = new MockTime()
+
+    // create a log
+    val log = new Log(logDir, time, 1000, 1000, rollMs, false)
+
+    // segment age is less than its limit
+    log.append(set)
+    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+    log.append(set)
+    assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
+
+    // segment expires in age
+    time.currentMs += rollMs + 1
+    log.append(set)
+    assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+
+    time.currentMs += rollMs + 1
+    val blank = Array[Message]()
+    log.append(new ByteBufferMessageSet(blank:_*))
+    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+
+    time.currentMs += rollMs + 1
+    // the last segment expired in age, but was blank. So new segment should not be generated
+    log.append(set)
+    assertEquals("There should be exactly 3 segments.", 3, log.numberOfSegments)
+  }
+
+  @Test
+  def testSizeBasedLogRoll() {
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val setSize = set.sizeInBytes
+    val msgPerSeg = 10
+    val segSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
+
+    // create a log
+    val log = new Log(logDir, SystemTime, segSize, 1000, 10000, false)
+    assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    // segments expire in size
+    for (i<- 1 to (msgPerSeg + 1)) {
+      log.append(set)
+    }
+    assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments)
+  }
+
+  @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, false)
+    new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
   }
   
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, false)
+      new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: IllegalStateException => "This is good"
@@ -64,7 +115,7 @@
   
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -81,7 +132,7 @@
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, false)
+    val log = new Log(logDir, SystemTime, 1024, 1000, 24 * 7 * 60 * 60 * 1000L, false)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -101,7 +152,7 @@
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, false)
+    val log = new Log(logDir, SystemTime, 100, 1000, 24 * 7 * 60 * 60 * 1000L, false)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -156,7 +207,7 @@
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, SystemTime, 100, 1000, 24 * 7 * 60 * 60 * 1000L, false)
       val curOffset = log.nextAppendOffset
       assertEquals(curOffset, 0)
 
@@ -169,7 +220,7 @@
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, false)
+      val log = new Log(logDir, SystemTime, 100, 1000, 24 * 7 * 60 * 60 * 1000L, false)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -192,6 +243,9 @@
     }
   }
 
+
+
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 
Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1375740)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -28,6 +28,7 @@
 class LogManagerTest extends JUnitSuite {
 
   val time: MockTime = new MockTime()
+  val maxSegAge = 100
   val maxLogAge = 1000
   var logDir: File = null
   var logManager: LogManager = null
@@ -41,7 +42,7 @@
                    override val enableZookeeper = false
                    override val flushInterval = 100
                  }
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
   }
@@ -70,7 +71,7 @@
   }
 
   @Test
-  def testCleanupExpiredSegments() {
+  def testCleanupExpiredSegmentsWithSizeBasedLogRoll() {
     val log = logManager.getOrCreateLog("cleanup", 0)
     var offset = 0L
     for(i <- 0 until 1000) {
@@ -101,7 +102,56 @@
   }
 
   @Test
-  def testCleanupSegmentsToMaintainSize() {
+  def testCleanupExpiredSegmentsWithTimeBasedLogRoll() {
+    val rollHours = 1
+    val rollMs = rollHours * 60 * 60L
+    val retentionHours = 24
+    val retentionMs = retentionHours * 60 * 60L
+    val props = TestUtils.createBrokerConfig(0, -1)
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    logManager.close
+    Thread.sleep(100)
+    config = new KafkaConfig(props) {
+      override val logFileSize = 1024
+      override val enableZookeeper = false
+      override val logRollHours = rollHours
+      override val logRetentionHours = retentionHours
+    }
+    logManager = new LogManager(config, null, time, rollMs, -1, retentionMs, false)
+    logManager.startup
+
+    // create a log
+    val log = logManager.getOrCreateLog("cleanup", 0)
+    var offset = 0L
+    for(i <- 0 until 10) {
+      time.currentMs += rollMs + 1
+      log.append(set)
+      offset += set.sizeInBytes
+    }
+    log.flush
+
+    assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
+
+    // update the last modified time of all log segments
+    val logSegments = log.segments.view
+    logSegments.foreach(s => s.file.setLastModified(time.currentMs))
+
+    time.currentMs += retentionMs + 1
+    logManager.cleanupLogs()
+    assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
+    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+    try {
+      log.read(0, 1024)
+      fail("Should get exception from fetching earlier.")
+    } catch {
+      case e: OffsetOutOfRangeException => "This is good."
+    }
+    // log should still be appendable
+    log.append(set)
+  }
+
+  @Test
+  def testCleanupSegmentsToMaintainSizeWithSizeBasedLogRoll() {
     val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
     val retentionHours = 1
     val retentionMs = 1000 * 60 * 60 * retentionHours
@@ -111,11 +161,11 @@
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
       override val enableZookeeper = false
-      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over
+      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
       override val flushInterval = 100
     }
-    logManager = new LogManager(config, null, time, -1, retentionMs, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, retentionMs, false)
     logManager.startup
 
     // create a log
@@ -132,12 +182,12 @@
     log.flush
     Thread.sleep(2000)
 
-    // should be exactly 100 full segments + 1 new empty one
-    assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments)
+    // should be exactly 100 full segments
+    assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
     logManager.cleanupLogs()
-    assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments)
+    assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -150,6 +200,63 @@
   }
 
   @Test
+  def testCleanupSegmentsToMaintainSizeWithTimeBasedLogRoll() {
+    val rollHours = 1
+    val rollMs = rollHours * 60 * 60L
+    val retentionHours = 24
+    val retentionMs = 1000 * 60 * 60 * retentionHours
+    val props = TestUtils.createBrokerConfig(0, -1)
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val setSize = set.sizeInBytes
+    logManager.close
+    Thread.sleep(100)
+    config = new KafkaConfig(props) {
+      override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
+      override val enableZookeeper = false
+      override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
+      override val logRetentionHours = retentionHours
+      override val flushInterval = 100
+      override val logRollHours = rollHours
+    }
+    logManager = new LogManager(config, null, time, rollMs, -1, retentionMs, false)
+    logManager.startup
+
+    // create a log
+    val log = logManager.getOrCreateLog("cleanup", 0)
+    var offset = 0L
+
+    // add a bunch of messages that should be larger than the retentionSize
+    // each segment expires in time, but not in size
+    log.append(set)
+    offset += set.sizeInBytes
+    for(i <- 1 until 100) {
+      if (i % 5 == 0)
+        time.currentMs += rollMs + 1
+      log.append(set)
+      offset += set.sizeInBytes
+    }
+    // flush to make sure it's written to disk, then sleep to confirm
+    log.flush
+    Thread.sleep(2000)
+
+    // should be exactly 20 full segments
+    assertEquals("There should be example 20 segments.", 20, log.numberOfSegments)
+
+    // this cleanup shouldn't find any expired segments but should delete some to reduce size
+    logManager.cleanupLogs()
+    assertEquals("Now there should be exactly 11 segments", 11, log.numberOfSegments)
+    assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes)
+    try {
+      log.read(0, 1024)
+      fail("Should get exception from fetching earlier.")
+    } catch {
+      case e: OffsetOutOfRangeException => "This is good."
+    }
+    // log should still be appendable
+    log.append(set)
+  }
+
+  @Test
   def testTimeBasedFlush() {
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.close
@@ -161,7 +268,7 @@
                    override val flushInterval = Int.MaxValue
                    override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
                  }
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
     logManager.startup
     val log = logManager.getOrCreateLog("timebasedflush", 0)
     for(i <- 0 until 200) {
@@ -185,7 +292,7 @@
                    override val flushInterval = 100
                  }
     
-    logManager = new LogManager(config, null, time, -1, maxLogAge, false)
+    logManager = new LogManager(config, null, time, maxSegAge, -1, maxLogAge, false)
     logManager.startup
     
     for(i <- 0 until 2) {
Index: core/src/test/scala/other/kafka/TestLogPerformance.scala
===================================================================
--- core/src/test/scala/other/kafka/TestLogPerformance.scala	(revision 1375740)
+++ core/src/test/scala/other/kafka/TestLogPerformance.scala	(working copy)
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.message._
-import kafka.utils.{TestUtils, Utils}
+import kafka.utils.{TestUtils, Utils, SystemTime}
 
 object TestLogPerformance {
 
@@ -30,7 +30,7 @@
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, false)
+    val log = new Log(dir, SystemTime, 50*1024*1024, 5000000, 24 * 7 * 60 * 60 * 1000L, false)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1375740)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -90,8 +90,10 @@
 /**
  * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size 
  */
-private[log] class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long) extends Range {
+private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range {
+  private val startTime = time.milliseconds
   @volatile var deleted = false
+  def timeOfCreation = startTime
   def size: Long = messageSet.highWaterMark
   override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
 }
@@ -101,7 +103,8 @@
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int, val needRecovery: Boolean) extends Logging {
+private[log] class Log(val dir: File, val time: Time, val maxSize: Long,
+                       val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean) extends Logging {
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
@@ -120,7 +123,7 @@
 
   private val logStats = new LogStats(this)
 
-  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)  
+  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
 
   /* Load the log segments from the log files on disk */
   private def loadSegments(): SegmentList[LogSegment] = {
@@ -134,7 +137,7 @@
         val filename = file.getName()
         val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
         val messageSet = new FileMessageSet(file, false)
-        accum.add(new LogSegment(file, messageSet, start))
+        accum.add(new LogSegment(file, time, messageSet, start))
       }
     }
 
@@ -142,7 +145,7 @@
       // no existing segments, create a new mutable segment
       val newFile = new File(dir, Log.nameFromOffset(0))
       val set = new FileMessageSet(newFile, true)
-      accum.add(new LogSegment(newFile, set, 0))
+      accum.add(new LogSegment(newFile, time, set, 0))
     } else {
       // there is at least one existing segment, validate and recover them/it
       // sort segments into ascending order for fast searching
@@ -159,7 +162,7 @@
       val last = accum.remove(accum.size - 1)
       last.messageSet.close()
       info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
-      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
+      val mutable = new LogSegment(last.file, time, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start)
       accum.add(mutable)
     }
     new SegmentList(accum.toArray(new Array[LogSegment](accum.size)))
@@ -225,10 +228,11 @@
     // they are valid, insert them in the log
     lock synchronized {
       try {
-        val segment = segments.view.last
+        var segment = segments.view.last
+        maybeRoll(segment)
+        segment = segments.view.last
         segment.messageSet.append(validMessages)
         maybeFlush(numberOfMessages)
-        maybeRoll(segment)
       }
       catch {
         case e: IOException =>
@@ -299,7 +303,8 @@
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
-    if(segment.messageSet.sizeInBytes > maxSize)
+    if((segment.messageSet.sizeInBytes > maxSize) ||
+       ((time.milliseconds - segment.timeOfCreation > rollIntervalMs) && (segment.messageSet.sizeInBytes > 0)))
       roll()
   }
 
@@ -315,7 +320,7 @@
         newFile.delete()
       }
       debug("Rolling log '" + name + "' to " + newFile.getName())
-      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+      segments.append(new LogSegment(newFile, time, new FileMessageSet(newFile, true), newOffset))
     }
   }
 
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1375740)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -33,10 +33,11 @@
 private[kafka] class LogManager(val config: KafkaConfig,
                                 private val scheduler: KafkaScheduler,
                                 private val time: Time,
+                                val logRollSegDefaultAgeMs: Long,
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
                                 needRecovery: Boolean) extends Logging {
-  
+
   val logDir: File = new File(config.logDir)
   private val numPartitions = config.numPartitions
   private val maxSize: Long = config.logFileSize
@@ -49,8 +50,9 @@
   private val startupLatch: CountDownLatch = if (config.enableZookeeper) new CountDownLatch(1) else null
   private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervalMap = config.flushIntervalMap
-  private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap)
   private val logRetentionSize = config.logRetentionSize
+  private val logRetentionMsMap = getMsMap(config.logRetentionHoursMap)
+  private val logRollMsMap = getMsMap(config.logRollHoursMap)
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -67,7 +69,9 @@
         warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
       } else {
         info("Loading log '" + dir.getName() + "'")
-        val log = new Log(dir, maxSize, flushInterval, needRecovery)
+        val topic = Utils.getTopicPartition(dir.getName)._1
+        val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollSegDefaultAgeMs)
+        val log = new Log(dir, time, maxSize, flushInterval, rollIntervalMs, needRecovery)
         val topicPartion = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartion._1)
@@ -108,10 +112,11 @@
 
   case object StopActor
 
-  private def getLogRetentionMSMap(logRetentionHourMap: Map[String, Int]) : Map[String, Long] = {
+  private def getMsMap(hoursMap: Map[String, Int]) : Map[String, Long] = {
     var ret = new mutable.HashMap[String, Long]
-    for ( (topic, hour) <- logRetentionHourMap )
+    for ( (topic, hour) <- hoursMap ) {
       ret.put(topic, hour * 60 * 60 * 1000L)
+    }
     ret
   }
 
@@ -146,7 +151,8 @@
     logCreationLock synchronized {
       val d = new File(logDir, topic + "-" + partition)
       d.mkdirs()
-      new Log(d, maxSize, flushInterval, false)
+      val rollIntervalMs = logRetentionMsMap.get(topic).getOrElse(this.logRollSegDefaultAgeMs)
+      new Log(d, time, maxSize, flushInterval, rollIntervalMs, false)
     }
   }
   
@@ -236,7 +242,7 @@
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
     val topic = Utils.getTopicPartition(log.dir.getName)._1
-    val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
+    val logCleanupThresholdMS = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
     val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS)
     val total = deleteSegments(log, toBeDeleted)
     total
Index: core/src/main/scala/kafka/utils/Utils.scala
===================================================================
--- core/src/main/scala/kafka/utils/Utils.scala	(revision 1375740)
+++ core/src/main/scala/kafka/utils/Utils.scala	(working copy)
@@ -566,7 +566,7 @@
   }
 
   /**
-   * This method gets comma seperated values which contains key,value pairs and returns a map of
+   * This method gets comma separated values which contains key,value pairs and returns a map of
    * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
    */
   private def getCSVMap[K, V](allCSVals: String, exceptionMsg:String, successMsg:String) :Map[K, V] = {
@@ -595,12 +595,18 @@
     }
   }
 
-  def getTopicRentionHours(retentionHours: String) : Map[String, Int] = {
+  def getTopicRetentionHours(retentionHours: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.log.retention.hours in server.properties: "
     val successMsg =  "The retention hour for "
     getCSVMap(retentionHours, exceptionMsg, successMsg)
   }
 
+  def getTopicRollHours(rollHours: String) : Map[String, Int] = {
+    val exceptionMsg = "Malformed token for topic.log.roll.hours in server.properties: "
+    val successMsg =  "The roll hour for "
+    getCSVMap(rollHours, exceptionMsg, successMsg)
+  }
+
   def getTopicFlushIntervals(allIntervals: String) : Map[String, Int] = {
     val exceptionMsg = "Malformed token for topic.flush.Intervals.ms in server.properties: "
     val successMsg =  "The flush interval for "
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1375740)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -57,18 +57,24 @@
   
   /* the maximum size of a single log file */
   val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))
-  
+
+  /* the maximum time before a new log segment is rolled out */
+  val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, 24*7))
+
+  /* the number of hours before rolling out a new log segment for some specific topic */
+  val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))
+
   /* the number of messages accumulated on a log partition before messages are flushed to disk */
   val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))
   
   /* the number of hours to keep a log file before deleting it */
-  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
+  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, 24*7))
   
   /* the maximum size of the log before deleting it */
   val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
 
   /* the number of hours to keep a log file before deleting it for some specific topic*/
-  val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
+  val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
 
   /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */
   val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1375740)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -58,6 +58,7 @@
     logManager = new LogManager(config,
                                 scheduler,
                                 SystemTime,
+                                1000L * 60 * 60 * config.logRollHours,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)
