diff -r -p kafka-0.7.2-incubating-src-orig/core/src/main/scala/kafka/log/Log.scala kafka-0.7.2-incubating-src/core/src/main/scala/kafka/log/Log.scala *** kafka-0.7.2-incubating-src-orig/core/src/main/scala/kafka/log/Log.scala 2012-09-28 18:38:11.000000000 -0700 --- kafka-0.7.2-incubating-src/core/src/main/scala/kafka/log/Log.scala 2013-05-07 16:05:10.000000000 -0700 *************** private[log] object Log { *** 88,111 **** } /** ! * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size */ private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range { ! var firstAppendTime: Option[Long] = None @volatile var deleted = false def size: Long = messageSet.highWaterMark - private def updateFirstAppendTime() { - if (firstAppendTime.isEmpty) - firstAppendTime = Some(time.milliseconds) - } - def append(messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { messageSet.append(messages) ! updateFirstAppendTime() } ! } override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")" } --- 88,109 ---- } /** ! * A segment file in the log directory. Each log segment consists of an open message set, a start offset and a size */ private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range { ! var lastAppendTime: Option[Long] = None ! if (file.exists && file.length > 0) { ! lastAppendTime = Option(file.lastModified()) ! } @volatile var deleted = false def size: Long = messageSet.highWaterMark def append(messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { messageSet.append(messages) ! lastAppendTime = Some(time.milliseconds) } ! } override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")" } *************** private[log] class Log(val dir: File, va *** 313,323 **** def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark /** ! * Roll the log over if necessary */ private def maybeRoll(segment: LogSegment) { if((segment.messageSet.sizeInBytes > maxSize) || ! ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs))) roll() } --- 311,324 ---- def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark /** ! * Roll the log over if segment > maxSize or we have passed the ! * rollIntervalMs boundary. */ private def maybeRoll(segment: LogSegment) { if((segment.messageSet.sizeInBytes > maxSize) || ! ((segment.lastAppendTime.isDefined) ! && ((time.milliseconds / rollIntervalMs) ! != (segment.lastAppendTime.get / rollIntervalMs)))) roll() } diff -r -p kafka-0.7.2-incubating-src-orig/core/src/test/scala/unit/kafka/log/LogTest.scala kafka-0.7.2-incubating-src/core/src/test/scala/unit/kafka/log/LogTest.scala *** kafka-0.7.2-incubating-src-orig/core/src/test/scala/unit/kafka/log/LogTest.scala 2012-09-28 18:38:10.000000000 -0700 --- kafka-0.7.2-incubating-src/core/src/test/scala/unit/kafka/log/LogTest.scala 2013-05-07 16:08:49.000000000 -0700 *************** class LogTest extends JUnitSuite { *** 67,76 **** assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) // segment expires in age ! time.currentMs += rollMs + 1 log.append(set) assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) time.currentMs += rollMs + 1 val blank = Array[Message]() log.append(new ByteBufferMessageSet(blank:_*)) --- 67,81 ---- assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) // segment expires in age ! // It expires once it enters the next time window, i.e. ! // when time.currentMs / rollMs changes, which might be less than rollMs. ! val advanceBy = (rollMs - (time.currentMs % rollMs)) + 1 ! assertTrue(advanceBy <= rollMs) ! time.currentMs += advanceBy log.append(set) assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) + // segment expires again, by moving forward rollMs + 1 time.currentMs += rollMs + 1 val blank = Array[Message]() log.append(new ByteBufferMessageSet(blank:_*)) *************** class LogTest extends JUnitSuite { *** 83,88 **** --- 88,123 ---- } @Test + def testTimeBasedLogRollFromRecoveredFile() { + val set = TestUtils.singleMessageSet("test".getBytes()) + val rollMs = 1 * 60 * 60L + val time: MockTime = new MockTime() + + // create a log + val log = new Log(logDir, time, 1000, config.maxMessageSize, 1000, rollMs, false) + time.currentMs += rollMs + 1 + + // segment age is less than its limit + log.append(set) + assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + + log.append(set) + assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + + // Another log should grab the file from the first one, and keep going + // Log rolling should still work, even though we 'recovered' the segment + val log2 = new Log(logDir, time, 1000, config.maxMessageSize, 1000, rollMs, false) + // segment expires in age + // It expires once it enters the next time window, i.e. + // when time.currentMs / rollMs changes, which might be less than rollMs. + val advanceBy = (rollMs - (time.currentMs % rollMs)) + 1 + assertTrue(advanceBy <= rollMs) + time.currentMs += advanceBy + log2.append(set) + assertEquals("There should be exactly 2 segments.", 2, log2.numberOfSegments) + } + + @Test def testSizeBasedLogRoll() { val set = TestUtils.singleMessageSet("test".getBytes()) val setSize = set.sizeInBytes