diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f634896..79b6ab5 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -164,7 +164,8 @@ private[kafka] class Log(val dir: File, logSegments.add(new LogSegment(dir = dir, startOffset = offset, indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) + maxIndexSize = maxIndexSize, + time = time)) } } } @@ -174,7 +175,8 @@ private[kafka] class Log(val dir: File, logSegments.add(new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) + maxIndexSize = maxIndexSize, + time = time)) } else { // there is at least one existing segment, validate and recover them/it // sort segments into ascending order for fast searching @@ -482,7 +484,7 @@ private[kafka] class Log(val dir: File, if(segment.messageSet.sizeInBytes > maxLogFileSize) { info("Rolling %s due to full data log".format(name)) roll() - } else if((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) { + } else if((segment.lastAppendTime.isDefined) && ((time.milliseconds / rollIntervalMs) != (segment.lastAppendTime.get / rollIntervalMs))) { info("Rolling %s due to time based rolling".format(name)) roll() } else if(segment.index.isFull) { @@ -526,7 +528,8 @@ private[kafka] class Log(val dir: File, val segment = new LogSegment(dir, startOffset = newOffset, indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize) + maxIndexSize = maxIndexSize, + time = time) segments.append(segment) segment } @@ -668,7 +671,8 @@ private[kafka] class Log(val dir: File, segments.append(new LogSegment(dir, newOffset, indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize)) + maxIndexSize = maxIndexSize, + time = time)) this.nextOffset.set(newOffset) } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 6f11a8b..0e8f1b4 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -23,49 +23,44 @@ import kafka.utils._ /** * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing - * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each + * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in * any previous segment. - * - * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. + * + * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. */ @nonthreadsafe -class LogSegment(val messageSet: FileMessageSet, - val index: OffsetIndex, - val start: Long, +class LogSegment(val messageSet: FileMessageSet, + val index: OffsetIndex, + val start: Long, val indexIntervalBytes: Int, time: Time) extends Range with Logging { - - var firstAppendTime: Option[Long] = - if (messageSet.sizeInBytes > 0) - Some(time.milliseconds) + + var lastAppendTime: Option[Long] = + if (messageSet.file.exists && messageSet.file.length > 0) + Option(messageSet.file.lastModified()) else None - + /* the number of bytes since we last added an entry in the offset index */ var bytesSinceLastIndexEntry = 0 - + @volatile var deleted = false - - def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = - this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), + + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, time: Time) = + this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, - SystemTime) - + time) + /* Return the size in bytes of this log segment */ def size: Long = messageSet.sizeInBytes() - def updateFirstAppendTime() { - if (firstAppendTime.isEmpty) - firstAppendTime = Some(time.milliseconds) - } - /** * Append the given messages starting with the given offset. Add * an entry to the index if needed. - * + * * It is assumed this method is being called from within a lock */ def append(offset: Long, messages: ByteBufferMessageSet) { @@ -78,11 +73,11 @@ class LogSegment(val messageSet: FileMessageSet, } // append the messages messageSet.append(messages) - updateFirstAppendTime() + lastAppendTime = Some(time.milliseconds) this.bytesSinceLastIndexEntry += messages.sizeInBytes } } - + /** * Find the physical file position for the least offset >= the given offset. If no offset is found * that meets this criteria before the end of the log, return null. @@ -91,7 +86,7 @@ class LogSegment(val messageSet: FileMessageSet, val mapping = index.lookup(offset) messageSet.searchFor(offset, mapping.position) } - + /** * Read a message set from this segment beginning with the first offset * greater than or equal to the startOffset. The message set will include @@ -102,16 +97,16 @@ class LogSegment(val messageSet: FileMessageSet, throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) if(maxSize == 0) return MessageSet.Empty - + val logSize = messageSet.sizeInBytes // this may change, need to save a consistent copy val startPosition = translateOffset(startOffset) - + // if the start position is already off the end of the log, return MessageSet.Empty if(startPosition == null) return MessageSet.Empty - + // calculate the length of the message set to read based on whether or not they gave us a maxOffset - val length = + val length = maxOffset match { case None => // no max offset, just use the max size they gave unmolested @@ -121,12 +116,12 @@ class LogSegment(val messageSet: FileMessageSet, if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) - val endPosition = + val endPosition = if(mapping == null) logSize // the max offset is off the end of the log, use the end of the file else mapping.position - min(endPosition - startPosition.position, maxSize) + min(endPosition - startPosition.position, maxSize) } } messageSet.read(startPosition.position, length) @@ -135,7 +130,7 @@ class LogSegment(val messageSet: FileMessageSet, override def toString() = "LogSegment(start=" + start + ", size=" + size + ")" /** - * Truncate off all index and log entries with offsets greater than or equal to the current offset. + * Truncate off all index and log entries with offsets greater than or equal to the current offset. */ def truncateTo(offset: Long) { val mapping = translateOffset(offset) @@ -146,10 +141,10 @@ class LogSegment(val messageSet: FileMessageSet, index.resize(index.maxIndexSize) messageSet.truncateTo(mapping.position) if (messageSet.sizeInBytes == 0) - firstAppendTime = None + lastAppendTime = None bytesSinceLastIndexEntry = 0 } - + /** * Calculate the offset that would be used for the next message to be append to this segment. * Note that this is expensive. @@ -161,7 +156,7 @@ class LogSegment(val messageSet: FileMessageSet, case Some(last) => last.nextOffset } } - + /** * Flush this log segment to disk */ @@ -169,7 +164,7 @@ class LogSegment(val messageSet: FileMessageSet, messageSet.flush() index.flush() } - + /** * Close this log segment */ @@ -177,5 +172,5 @@ class LogSegment(val messageSet: FileMessageSet, Utils.swallow(index.close) Utils.swallow(messageSet.close) } - + } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 4ed88e8..661dbe5 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -30,7 +30,7 @@ import scala.Some import kafka.server.KafkaConfig class LogTest extends JUnitSuite { - + var logDir: File = null val time = new MockTime var config: KafkaConfig = null @@ -46,7 +46,7 @@ class LogTest extends JUnitSuite { def tearDown() { Utils.rm(logDir) } - + def createEmptyLogs(dir: File, offsets: Int*) { for(offset <- offsets) { Log.logFilename(dir, offset).createNewFile() @@ -73,10 +73,15 @@ class LogTest extends JUnitSuite { assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) // segment expires in age - time.currentMs += rollMs + 1 + // It expires once it enters the next time window, i.e. + // when time.currentMs / rollMs changes, which might be less than rollMs. + val advanceBy = (rollMs - (time.currentMs % rollMs)) + 1 + assertTrue(advanceBy <= rollMs) + time.currentMs += advanceBy log.append(set) assertEquals("There should be exactly 2 segments.", 2, log.numberOfSegments) + // segment expires again, by moving forward rollMs + 1 time.currentMs += rollMs + 1 val blank = Array[Message]() log.append(new ByteBufferMessageSet(new Message("blah".getBytes))) @@ -89,6 +94,36 @@ class LogTest extends JUnitSuite { } @Test + def testTimeBasedLogRollFromRecoveredFile() { + val set = TestUtils.singleMessageSet("test".getBytes()) + val rollMs = 1 * 60 * 60L + val time: MockTime = new MockTime() + + // create a log + val log = new Log(logDir, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = false, time = time) + time.currentMs += rollMs + 1 + + // segment age is less than its limit + log.append(set) + assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + + log.append(set) + assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + + // Another log should grab the file from the first one, and keep going + // Log rolling should still work, even though we 'recovered' the segment + val log2 = new Log(logDir, 1000, config.messageMaxBytes, 1000, rollMs, needsRecovery = true, time = time) + // segment expires in age + // It expires once it enters the next time window, i.e. + // when time.currentMs / rollMs changes, which might be less than rollMs. + val advanceBy = (rollMs - (time.currentMs % rollMs)) + 1 + assertTrue(advanceBy <= rollMs) + time.currentMs += advanceBy + log2.append(set) + assertEquals("There should be exactly 2 segments.", 2, log2.numberOfSegments) + } + + @Test def testSizeBasedLogRoll() { val set = TestUtils.singleMessageSet("test".getBytes()) val setSize = set.sizeInBytes @@ -168,19 +203,19 @@ class LogTest extends JUnitSuite { val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) assertEquals("Should be no more messages", 0, lastRead.size) } - + /** Test the case where we have compressed batches of messages */ @Test def testCompressedMessages() { /* this log should roll after every messageset */ val log = new Log(logDir, 10, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) - + /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) - + /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) @@ -202,7 +237,7 @@ class LogTest extends JUnitSuite { assertContains(makeRanges(5,8), 5) assertContains(makeRanges(5,8), 6) } - + @Test def testEdgeLogRollsStartingAtZero() { // first test a log segment starting at 0 @@ -226,7 +261,7 @@ class LogTest extends JUnitSuite { for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) val curOffset = log.logEndOffset - + // time goes by; the log file is deleted log.markDeletedWhile(_ => true) @@ -262,7 +297,7 @@ class LogTest extends JUnitSuite { case e:MessageSizeTooLargeException => // this is good } } - + @Test def testLogRecoversToCorrectOffset() { val numMessages = 100 @@ -276,15 +311,15 @@ class LogTest extends JUnitSuite { val lastIndexOffset = log.segments.view.last.index.lastOffset val numIndexEntries = log.segments.view.last.index.entries log.close() - + // test non-recovery case log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) log.close() - - // test + + // test log = new Log(logDir, segmentSize, config.messageMaxBytes, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) @@ -305,10 +340,10 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments) assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset) - + val lastOffset = log.logEndOffset val size = log.size log.truncateTo(log.logEndOffset) // keep the entire log @@ -326,7 +361,7 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original size", log.size, size) log.truncateAndStartWithNewOffset(log.logEndOffset - (msgPerSeg - 1)) @@ -371,14 +406,14 @@ class LogTest extends JUnitSuite { def testAppendWithoutOffsetAssignment() { for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) { logDir.mkdir() - var log = new Log(logDir, - maxLogFileSize = 64*1024, + var log = new Log(logDir, + maxLogFileSize = 64*1024, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 10000, + maxIndexSize = 1000, + indexIntervalBytes = 10000, needsRecovery = true) val messages = List("one", "two", "three", "four", "five", "six") - val ms = new ByteBufferMessageSet(compressionCodec = codec, + val ms = new ByteBufferMessageSet(compressionCodec = codec, offsetCounter = new AtomicLong(0), messages = messages.map(s => new Message(s.getBytes)):_*) val firstOffset = ms.toList.head.offset @@ -391,7 +426,7 @@ class LogTest extends JUnitSuite { log.delete() } } - + /** * When we open a log any index segments without an associated log segment should be deleted. */ @@ -399,22 +434,22 @@ class LogTest extends JUnitSuite { def testBogusIndexSegmentsAreRemoved() { val bogusIndex1 = Log.indexFilename(logDir, 0) val bogusIndex2 = Log.indexFilename(logDir, 5) - + val set = TestUtils.singleMessageSet("test".getBytes()) - val log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + val log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 1, + maxIndexSize = 1000, + indexIntervalBytes = 1, needsRecovery = false) - + assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertFalse("The second index file should have been deleted.", bogusIndex2.exists) - + // check that we can append to the log for(i <- 0 until 10) log.append(set) - + log.delete() } @@ -423,38 +458,38 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) // create a log - var log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + var log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 10000, + maxIndexSize = 1000, + indexIntervalBytes = 10000, needsRecovery = true) - + // add enough messages to roll over several segments then close and re-open and attempt to truncate for(i <- 0 until 100) log.append(set) log.close() - log = new Log(logDir, - maxLogFileSize = set.sizeInBytes * 5, + log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, maxMessageSize = config.messageMaxBytes, - maxIndexSize = 1000, - indexIntervalBytes = 10000, + maxIndexSize = 1000, + indexIntervalBytes = 10000, needsRecovery = true) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } - + def assertContains(ranges: Array[Range], offset: Long) = { Log.findRange(ranges, offset) match { - case Some(range) => + case Some(range) => assertTrue(range + " does not contain " + offset, range.contains(offset)) case None => fail("No range found, but expected to find " + offset) } } - + class SimpleRange(val start: Long, val size: Long) extends Range - + def makeRanges(breaks: Int*): Array[Range] = { val list = new ArrayList[Range] var prior = 0 @@ -464,5 +499,5 @@ class LogTest extends JUnitSuite { } list.toArray(new Array[Range](list.size)) } - + }