diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0cc402b..0b11fa5 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -48,6 +48,7 @@ import com.yammer.metrics.core.Gauge class Log(val dir: File, @volatile var config: LogConfig, @volatile var recoveryPoint: Long = 0L, + val cleanShutdownFileExists: Boolean, val scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { @@ -165,6 +166,12 @@ class Log(val dir: File, private def recoverLog() { val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} + val needsRecovery = !cleanShutdownFileExists + if(!needsRecovery) { + info("Found clean shutdown file. Skipping recovery for Log '%s'".format(name)) + this.recoveryPoint = lastOffset + return + } if(lastOffset <= this.recoveryPoint) { info("Log '%s' is fully intact, skipping recovery".format(name)) this.recoveryPoint = lastOffset diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d489e08..5dfe07d 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -48,6 +48,10 @@ class LogManager(val logDirs: Array[File], private val time: Time) extends Logging { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" + /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility + * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */ + /** TODO: Get rid of CleanShutdownFile in 0.8.2 */ + val CleanShutdownFile = ".kafka_cleanshutdown" val LockFile = ".lock" val InitialTaskDelayMs = 30*1000 private val logCreationLock = new Object @@ -109,14 +113,16 @@ class LogManager(val logDirs: Array[File], /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { + val cleanShutDownFile = new File(dir, CleanShutdownFile) for(dir <- subDirs) { - if(dir.isDirectory){ + if(dir.isDirectory) { info("Loading log '" + dir.getName + "'") val topicPartition = parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), + cleanShutDownFile.exists(), scheduler, time) val previous = this.logs.put(topicPartition, log) @@ -124,6 +130,7 @@ class LogManager(val logDirs: Array[File], throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } + cleanShutDownFile.delete() } } } @@ -237,6 +244,7 @@ class LogManager(val logDirs: Array[File], log = new Log(dir, config, recoveryPoint = 0L, + false, scheduler, time) logs.put(topicAndPartition, log) diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 8fcd068..af6f025 100644 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -38,6 +38,7 @@ object StressTestLog { maxMessageSize = Int.MaxValue, maxIndexSize = 1024*1024), recoveryPoint = 0L, + false, scheduler = time.scheduler, time = time) val writer = new WriterThread(log) diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala index eeb8c88..ce24ed4 100644 --- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala +++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala @@ -200,7 +200,7 @@ object TestLinearWriteSpeed { class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages: ByteBufferMessageSet) extends Writable { Utils.rm(dir) - val log = new Log(dir, config, 0L, scheduler, SystemTime) + val log = new Log(dir, config, 0L, false, scheduler, SystemTime) def write(): Int = { log.append(messages, true) messages.sizeInBytes diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 5a312bf..ffcaa3f 100644 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -195,7 +195,7 @@ class CleanerTest extends JUnitSuite { } def makeLog(dir: File = dir, config: LogConfig = logConfig) = - new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + new Log(dir = dir, config = config, recoveryPoint = 0L, false, scheduler = time.scheduler, time = time) def makeCleaner(capacity: Int) = new Cleaner(id = 0, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 1de3ef0..4b49e19 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -103,6 +103,7 @@ class LogCleanerIntegrationTest extends JUnitSuite { val log = new Log(dir = dir, LogConfig(segmentSize = segmentSize, maxIndexSize = 100*1024, fileDeleteDelayMs = deleteDelay, dedupe = true), recoveryPoint = 0L, + false, scheduler = time.scheduler, time = time) logs.put(TopicAndPartition("log", i), log) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 140317c..48b5a7b 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -64,10 +64,11 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) // create a log - val log = new Log(logDir, - logConfig.copy(segmentMs = 1 * 60 * 60L), - recoveryPoint = 0L, - scheduler = time.scheduler, + val log = new Log(logDir, + logConfig.copy(segmentMs = 1 * 60 * 60L), + recoveryPoint = 0L, + false, + scheduler = time.scheduler, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) time.sleep(log.config.segmentMs + 1) @@ -100,7 +101,7 @@ class LogTest extends JUnitSuite { val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, false, time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -116,7 +117,7 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = new Log(logDir, logConfig, recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig, recoveryPoint = 0L, false, time.scheduler, time = time) log.append(TestUtils.singleMessageSet("test".getBytes)) } @@ -125,9 +126,9 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, false, time.scheduler, time = time) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray - + for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) for(i <- 0 until messages.length) { @@ -137,17 +138,17 @@ class LogTest extends JUnitSuite { } assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size) } - + /** * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message * from any offset less than the logEndOffset including offsets not appended. */ @Test def testAppendAndReadWithNonSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, false, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) - + // now test the case that we give the offsets and use non-sequential offsets for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) @@ -158,27 +159,27 @@ class LogTest extends JUnitSuite { assertEquals("Message should match appended.", messages(idx), read.message) } } - + /** * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment. * Specifically we create a log where the last message in the first segment has offset 0. If we - * then read offset 1, we should expect this read to come from the second segment, even though the + * then read offset 1, we should expect this read to come from the second segment, even though the * first segment has the greatest lower bound on the offset. */ @Test def testReadAtLogGap() { - val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time) - + val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, false, time.scheduler, time = time) + // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) - log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) - + log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) + // now manually truncate off all but one message from the first segment to create a gap in the messages log.logSegments.head.truncateTo(1) - + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset) } - + /** * Test reading at the boundary of the log, specifically * - reading from the logEndOffset should give an empty message set @@ -187,7 +188,7 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, false, time.scheduler, time = time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -210,7 +211,7 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, false, time.scheduler, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) messageSets.foreach(log.append(_)) @@ -226,34 +227,34 @@ class LogTest extends JUnitSuite { } val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) assertEquals("Should be no more messages", 0, lastRead.size) - + // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure TestUtils.retry(1000L){ assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) } } - + /** * Test reads at offsets that fall within compressed message set boundaries. */ @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time) - + val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, false, time.scheduler, time = time) + /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) - + /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) } - + /** * Test garbage collecting old segments */ @@ -262,10 +263,10 @@ class LogTest extends JUnitSuite { for(messagesToAppend <- List(0, 1, 25)) { logDir.mkdirs() // first test a log segment starting at 0 - val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, false, time.scheduler, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) - + var currOffset = log.logEndOffset assertEquals(currOffset, messagesToAppend) @@ -276,17 +277,17 @@ class LogTest extends JUnitSuite { assertEquals("We should still have one segment left", 1, log.numberOfSegments) assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true)) assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) - assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", + assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", currOffset, log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) - + // cleanup the log log.delete() } } /** - * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the + * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the * setting and checking that an exception is thrown. */ @Test @@ -296,7 +297,7 @@ class LogTest extends JUnitSuite { // append messages to log val maxMessageSize = second.sizeInBytes - 1 - val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint = 0L, false, time.scheduler, time = time) // should be able to append the small message log.append(first) @@ -308,7 +309,7 @@ class LogTest extends JUnitSuite { case e: MessageSizeTooLargeException => // this is good } } - + /** * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. */ @@ -319,7 +320,7 @@ class LogTest extends JUnitSuite { val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096) - var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, false, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) @@ -327,21 +328,21 @@ class LogTest extends JUnitSuite { val numIndexEntries = log.activeSegment.index.entries val lastOffset = log.logEndOffset log.close() - - log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time) + + log = new Log(logDir, config, recoveryPoint = lastOffset, false, time.scheduler, time) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() - + // test recovery case - log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, false, time.scheduler, time) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() } - + /** * Test that if we manually delete an index segment it is rebuilt when the log is re-opened */ @@ -350,17 +351,17 @@ class LogTest extends JUnitSuite { // publish the messages and close the log val numMessages = 200 val config = logConfig.copy(segmentSize = 200, indexInterval = 1) - var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + var log = new Log(logDir, config, recoveryPoint = 0L, false, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) val indexFiles = log.logSegments.map(_.index.file) log.close() - + // delete all the index files indexFiles.foreach(_.delete()) - + // reopen the log - log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, false, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) assertEquals(i, log.read(i, 100, None).head.offset) @@ -378,15 +379,15 @@ class LogTest extends JUnitSuite { val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, false, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments) assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset) - + val lastOffset = log.logEndOffset val size = log.size log.truncateTo(log.logEndOffset) // keep the entire log @@ -404,7 +405,7 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original size", log.size, size) log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1)) @@ -431,7 +432,7 @@ class LogTest extends JUnitSuite { val msgPerSeg = 10 val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages val config = logConfig.copy(segmentSize = segmentSize) - val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, config, recoveryPoint = 0L, false, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) log.append(set) @@ -455,23 +456,24 @@ class LogTest extends JUnitSuite { def testBogusIndexSegmentsAreRemoved() { val bogusIndex1 = Log.indexFilename(logDir, 0) val bogusIndex2 = Log.indexFilename(logDir, 5) - + val set = TestUtils.singleMessageSet("test".getBytes()) - val log = new Log(logDir, - logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, + val log = new Log(logDir, + logConfig.copy(segmentSize = set.sizeInBytes * 5, + maxIndexSize = 1000, indexInterval = 1), recoveryPoint = 0L, + false, time.scheduler, time) - + assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertFalse("The second index file should have been deleted.", bogusIndex2.exists) - + // check that we can append to the log for(i <- 0 until 10) log.append(set) - + log.delete() } @@ -481,31 +483,33 @@ class LogTest extends JUnitSuite { @Test def testReopenThenTruncate() { val set = TestUtils.singleMessageSet("test".getBytes()) - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, + val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, + maxIndexSize = 1000, indexInterval = 10000) // create a log - var log = new Log(logDir, + var log = new Log(logDir, config, recoveryPoint = 0L, + false, time.scheduler, time) - + // add enough messages to roll over several segments then close and re-open and attempt to truncate for(i <- 0 until 100) log.append(set) log.close() - log = new Log(logDir, + log = new Log(logDir, config, recoveryPoint = 0L, + false, time.scheduler, time) log.truncateTo(3) assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } - + /** * Test that deleted files are deleted after the appropriate time. */ @@ -513,38 +517,39 @@ class LogTest extends JUnitSuite { def testAsyncDelete() { val set = TestUtils.singleMessageSet("test".getBytes()) val asyncDeleteMs = 1000 - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - fileDeleteDelayMs = asyncDeleteMs, - maxIndexSize = 1000, + val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, + fileDeleteDelayMs = asyncDeleteMs, + maxIndexSize = 1000, indexInterval = 10000) val log = new Log(logDir, config, - recoveryPoint = 0L, + recoveryPoint = 0L, + false, time.scheduler, time) - + // append some messages to create some segments for(i <- 0 until 100) log.append(set) - + // files should be renamed val segments = log.logSegments.toArray val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file) log.deleteOldSegments((s) => true) - + assertEquals("Only one segment should remain.", 1, log.numberOfSegments) - assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && + assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix))) assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) && segments.forall(_.index.file.exists)) assertTrue("The original file should be gone.", oldFiles.forall(!_.exists)) - + // when enough time passes the files should be deleted val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file) time.sleep(asyncDeleteMs + 1) assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists)) } - + /** * Any files ending in .deleted should be removed when the log is re-opened. */ @@ -555,29 +560,32 @@ class LogTest extends JUnitSuite { var log = new Log(logDir, config, recoveryPoint = 0L, + false, time.scheduler, time) - + // append some messages to create some segments for(i <- 0 until 100) log.append(set) - + log.deleteOldSegments((s) => true) log.close() - - log = new Log(logDir, + + log = new Log(logDir, config, recoveryPoint = 0L, + false, time.scheduler, time) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } - + @Test def testAppendMessageWithNullPayload() { var log = new Log(logDir, LogConfig(), recoveryPoint = 0L, + false, time.scheduler, time) log.append(new ByteBufferMessageSet(new Message(bytes = null))) @@ -585,9 +593,9 @@ class LogTest extends JUnitSuite { assertEquals(0, ms.head.offset) assertTrue("Message payload should be null.", ms.head.message.isNull) } - + @Test - def testCorruptLog() { + def testCorruptLog() { // append some messages to create some segments val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) @@ -597,6 +605,7 @@ class LogTest extends JUnitSuite { var log = new Log(logDir, config, recoveryPoint = 0L, + false, time.scheduler, time) for(i <- 0 until 100) @@ -607,15 +616,50 @@ class LogTest extends JUnitSuite { val filePosition = messages.searchFor(recoveryPoint, 0).position val indexPosition = index.lookup(recoveryPoint).position log.close() - + // corrupt file TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) - + // attempt recovery - log = new Log(logDir, config, recoveryPoint, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint, false, time.scheduler, time) + assertEquals(recoveryPoint, log.logEndOffset) + } + } + + @Test + def testCleanShutdownFile() { + // append some messages to create some segments + val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) + val set = TestUtils.singleMessageSet("test".getBytes()) + for(iteration <- 0 until 10) { + var recoveryPoint = 50L + // create a log and write some messages to it + var log = new Log(logDir, + config, + recoveryPoint = 0L, + false, + time.scheduler, + time) + for(i <- 0 until 100) + log.append(set) + val seg = log.logSegments(0, recoveryPoint).last + val index = seg.index + val messages = seg.log + val filePosition = messages.searchFor(recoveryPoint, 0).position + val indexPosition = index.lookup(recoveryPoint).position + log.close() + + // corrupt file + TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) + TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) + + // check if recovery was attempted. Even if the file is corrupted, recovery should not be attempted as the + // clean shutdown file exists. This is only for testing purposes. In reality, if a 0.8 broker did not shutdown + // cleanly, there will be no .kafka_cleanshutdown file and on upgrading to 0.8.1, it will recover the log properly + recoveryPoint = log.logEndOffset + log = new Log(logDir, config, 0L, true, time.scheduler, time) assertEquals(recoveryPoint, log.logEndOffset) } } - }