From cd6d3f5eba2d81596ed290a2e64d699461f07eb1 Mon Sep 17 00:00:00 2001 From: Dan Frankowski Date: Sun, 28 Jul 2013 07:34:06 -0700 Subject: [PATCH] Change log to roll when time / rollIntervalMs changes. --- core/src/main/scala/kafka/log/Log.scala | 23 ++++++++------- core/src/test/scala/unit/kafka/log/LogTest.scala | 37 +++++++++++++++++++++++- 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5450699..7c906c1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -88,24 +88,22 @@ private[log] object Log { } /** - * A segment file in the log directory. Each log semgment consists of an open message set, a start offset and a size + * A segment file in the log directory. Each log segment consists of an open message set, a start offset and a size */ private[log] class LogSegment(val file: File, val time: Time, val messageSet: FileMessageSet, val start: Long) extends Range { - var firstAppendTime: Option[Long] = None + var lastAppendTime: Option[Long] = None + if (file.exists && file.length > 0) { + lastAppendTime = Option(file.lastModified()) + } @volatile var deleted = false def size: Long = messageSet.highWaterMark - private def updateFirstAppendTime() { - if (firstAppendTime.isEmpty) - firstAppendTime = Some(time.milliseconds) - } - def append(messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { messageSet.append(messages) - updateFirstAppendTime() + lastAppendTime = Some(time.milliseconds) } - } + } override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")" } @@ -313,11 +311,14 @@ private[log] class Log(val dir: File, val time: Time, val maxSize: Long, val max def getHighwaterMark: Long = segments.view.last.messageSet.highWaterMark /** - * Roll the log over if necessary + * Roll the log over if segment > maxSize or we have passed the + * rollIntervalMs boundary. */ private def maybeRoll(segment: LogSegment) { if((segment.messageSet.sizeInBytes > maxSize) || - ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs))) + ((segment.lastAppendTime.isDefined) + && ((time.milliseconds / rollIntervalMs) + != (segment.lastAppendTime.get / rollIntervalMs)))) roll() } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index f448f60..b29a391 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -67,10 +67,15 @@ class LogTest extends JUnitSuite { assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) // segment expires in age - time.currentMs += rollMs + 1 + // It expires once it enters the next time window, i.e. + // when time.currentMs / rollMs changes, which might be less than rollMs. + val advanceBy = (rollMs - (time.currentMs % rollMs)) + 1 + assertTrue(advanceBy <= rollMs) + time.currentMs += advanceBy log.append(set) assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) + // segment expires again, by moving forward rollMs + 1 time.currentMs += rollMs + 1 val blank = Array[Message]() log.append(new ByteBufferMessageSet(blank:_*)) @@ -83,6 +88,36 @@ class LogTest extends JUnitSuite { } @Test + def testTimeBasedLogRollFromRecoveredFile() { + val set = TestUtils.singleMessageSet("test".getBytes()) + val rollMs = 1 * 60 * 60L + val time: MockTime = new MockTime() + + // create a log + val log = new Log(logDir, time, 1000, config.maxMessageSize, 1000, rollMs, false) + time.currentMs += rollMs + 1 + + // segment age is less than its limit + log.append(set) + assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + + log.append(set) + assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + + // Another log should grab the file from the first one, and keep going + // Log rolling should still work, even though we 'recovered' the segment + val log2 = new Log(logDir, time, 1000, config.maxMessageSize, 1000, rollMs, false) + // segment expires in age + // It expires once it enters the next time window, i.e. + // when time.currentMs / rollMs changes, which might be less than rollMs. + val advanceBy = (rollMs - (time.currentMs % rollMs)) + 1 + assertTrue(advanceBy <= rollMs) + time.currentMs += advanceBy + log2.append(set) + assertEquals("There should be exactly 2 segments.", 2, log2.numberOfSegments) + } + + @Test def testSizeBasedLogRoll() { val set = TestUtils.singleMessageSet("test".getBytes()) val setSize = set.sizeInBytes -- 1.8.1.1