diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0cc402b..a0e1b11 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -165,6 +165,12 @@ class Log(val dir: File, private def recoverLog() { val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} + val cleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile) + val needsRecovery = !cleanShutdownFile.exists() + if(!needsRecovery) { + this.recoveryPoint = lastOffset + return + } if(lastOffset <= this.recoveryPoint) { info("Log '%s' is fully intact, skipping recovery".format(name)) this.recoveryPoint = lastOffset @@ -697,6 +703,11 @@ object Log { /** A temporary file used when swapping files into the log */ val SwapFileSuffix = ".swap" + /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility + * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */ + /** TODO: Get rid of CleanShutdownFile in 0.8.2 */ + val CleanShutdownFile = ".kafka_cleanshutdown" + /** * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros * so that ls sorts the files numerically. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d489e08..390b759 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -109,8 +109,11 @@ class LogManager(val logDirs: Array[File], /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { + val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) + if(cleanShutDownFile.exists()) + info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) for(dir <- subDirs) { - if(dir.isDirectory){ + if(dir.isDirectory) { info("Loading log '" + dir.getName + "'") val topicPartition = parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) @@ -124,6 +127,7 @@ class LogManager(val logDirs: Array[File], throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } + cleanShutDownFile.delete() } } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 140317c..ecc120a 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -64,10 +64,10 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) // create a log - val log = new Log(logDir, - logConfig.copy(segmentMs = 1 * 60 * 60L), - recoveryPoint = 0L, - scheduler = time.scheduler, + val log = new Log(logDir, + logConfig.copy(segmentMs = 1 * 60 * 60L), + recoveryPoint = 0L, + scheduler = time.scheduler, time = time) assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) time.sleep(log.config.segmentMs + 1) @@ -127,7 +127,7 @@ class LogTest extends JUnitSuite { def testAppendAndReadWithSequentialOffsets() { val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray - + for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i))) for(i <- 0 until messages.length) { @@ -137,7 +137,7 @@ class LogTest extends JUnitSuite { } assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size) } - + /** * This test appends a bunch of messages with non-sequential offsets and checks that we can read the correct message * from any offset less than the logEndOffset including offsets not appended. @@ -147,7 +147,7 @@ class LogTest extends JUnitSuite { val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) - + // now test the case that we give the offsets and use non-sequential offsets for(i <- 0 until messages.length) log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false) @@ -158,27 +158,27 @@ class LogTest extends JUnitSuite { assertEquals("Message should match appended.", messages(idx), read.message) } } - + /** * This test covers an odd case where we have a gap in the offsets that falls at the end of a log segment. * Specifically we create a log where the last message in the first segment has offset 0. If we - * then read offset 1, we should expect this read to come from the second segment, even though the + * then read offset 1, we should expect this read to come from the second segment, even though the * first segment has the greatest lower bound on the offset. */ @Test def testReadAtLogGap() { val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time) - + // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) - log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) - + log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes))) + // now manually truncate off all but one message from the first segment to create a gap in the messages log.logSegments.head.truncateTo(1) - + assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset) } - + /** * Test reading at the boundary of the log, specifically * - reading from the logEndOffset should give an empty message set @@ -226,13 +226,13 @@ class LogTest extends JUnitSuite { } val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)) assertEquals("Should be no more messages", 0, lastRead.size) - + // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure TestUtils.retry(1000L){ assertTrue("Log role should have forced flush", log.recoveryPoint >= log.activeSegment.baseOffset) } } - + /** * Test reads at offsets that fall within compressed message set boundaries. */ @@ -240,20 +240,20 @@ class LogTest extends JUnitSuite { def testCompressedMessages() { /* this log should roll after every messageset */ val log = new Log(logDir, logConfig.copy(segmentSize = 10), recoveryPoint = 0L, time.scheduler, time = time) - + /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - + def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message) - + /* we should always get the first message in the compressed set when reading any offset in the set */ assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) } - + /** * Test garbage collecting old segments */ @@ -265,7 +265,7 @@ class LogTest extends JUnitSuite { val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) - + var currOffset = log.logEndOffset assertEquals(currOffset, messagesToAppend) @@ -276,17 +276,17 @@ class LogTest extends JUnitSuite { assertEquals("We should still have one segment left", 1, log.numberOfSegments) assertEquals("Further collection shouldn't delete anything", 0, log.deleteOldSegments(_ => true)) assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset) - assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", + assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append", currOffset, log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset) - + // cleanup the log log.delete() } } /** - * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the + * We have a max size limit on message appends, check that it is properly enforced by appending a message larger than the * setting and checking that an exception is thrown. */ @Test @@ -308,7 +308,7 @@ class LogTest extends JUnitSuite { case e: MessageSizeTooLargeException => // this is good } } - + /** * Append a bunch of messages to a log and then re-open it both with and without recovery and check that the log re-initializes correctly. */ @@ -327,13 +327,13 @@ class LogTest extends JUnitSuite { val numIndexEntries = log.activeSegment.index.entries val lastOffset = log.logEndOffset log.close() - + log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() - + // test recovery case log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) @@ -341,7 +341,7 @@ class LogTest extends JUnitSuite { assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() } - + /** * Test that if we manually delete an index segment it is rebuilt when the log is re-opened */ @@ -355,12 +355,12 @@ class LogTest extends JUnitSuite { log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) val indexFiles = log.logSegments.map(_.index.file) log.close() - + // delete all the index files indexFiles.foreach(_.delete()) - + // reopen the log - log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) assertEquals(i, log.read(i, 100, None).head.offset) @@ -383,10 +383,10 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("There should be exactly 1 segments.", 1, log.numberOfSegments) assertEquals("Log end offset should be equal to number of messages", msgPerSeg, log.logEndOffset) - + val lastOffset = log.logEndOffset val size = log.size log.truncateTo(log.logEndOffset) // keep the entire log @@ -404,7 +404,7 @@ class LogTest extends JUnitSuite { for (i<- 1 to msgPerSeg) log.append(set) - + assertEquals("Should be back to original offset", log.logEndOffset, lastOffset) assertEquals("Should be back to original size", log.size, size) log.truncateFullyAndStartAt(log.logEndOffset - (msgPerSeg - 1)) @@ -455,23 +455,23 @@ class LogTest extends JUnitSuite { def testBogusIndexSegmentsAreRemoved() { val bogusIndex1 = Log.indexFilename(logDir, 0) val bogusIndex2 = Log.indexFilename(logDir, 5) - + val set = TestUtils.singleMessageSet("test".getBytes()) - val log = new Log(logDir, - logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, + val log = new Log(logDir, + logConfig.copy(segmentSize = set.sizeInBytes * 5, + maxIndexSize = 1000, indexInterval = 1), recoveryPoint = 0L, time.scheduler, time) - + assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0) assertFalse("The second index file should have been deleted.", bogusIndex2.exists) - + // check that we can append to the log for(i <- 0 until 10) log.append(set) - + log.delete() } @@ -481,22 +481,22 @@ class LogTest extends JUnitSuite { @Test def testReopenThenTruncate() { val set = TestUtils.singleMessageSet("test".getBytes()) - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, + val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, + maxIndexSize = 1000, indexInterval = 10000) // create a log - var log = new Log(logDir, + var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - + // add enough messages to roll over several segments then close and re-open and attempt to truncate for(i <- 0 until 100) log.append(set) log.close() - log = new Log(logDir, + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, @@ -505,7 +505,7 @@ class LogTest extends JUnitSuite { assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } - + /** * Test that deleted files are deleted after the appropriate time. */ @@ -513,38 +513,38 @@ class LogTest extends JUnitSuite { def testAsyncDelete() { val set = TestUtils.singleMessageSet("test".getBytes()) val asyncDeleteMs = 1000 - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - fileDeleteDelayMs = asyncDeleteMs, - maxIndexSize = 1000, + val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, + fileDeleteDelayMs = asyncDeleteMs, + maxIndexSize = 1000, indexInterval = 10000) val log = new Log(logDir, config, - recoveryPoint = 0L, + recoveryPoint = 0L, time.scheduler, time) - + // append some messages to create some segments for(i <- 0 until 100) log.append(set) - + // files should be renamed val segments = log.logSegments.toArray val oldFiles = segments.map(_.log.file) ++ segments.map(_.index.file) log.deleteOldSegments((s) => true) - + assertEquals("Only one segment should remain.", 1, log.numberOfSegments) - assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && + assertTrue("All log and index files should end in .deleted", segments.forall(_.log.file.getName.endsWith(Log.DeletedFileSuffix)) && segments.forall(_.index.file.getName.endsWith(Log.DeletedFileSuffix))) assertTrue("The .deleted files should still be there.", segments.forall(_.log.file.exists) && segments.forall(_.index.file.exists)) assertTrue("The original file should be gone.", oldFiles.forall(!_.exists)) - + // when enough time passes the files should be deleted val deletedFiles = segments.map(_.log.file) ++ segments.map(_.index.file) time.sleep(asyncDeleteMs + 1) assertTrue("Files should all be gone.", deletedFiles.forall(!_.exists)) } - + /** * Any files ending in .deleted should be removed when the log is re-opened. */ @@ -557,22 +557,22 @@ class LogTest extends JUnitSuite { recoveryPoint = 0L, time.scheduler, time) - + // append some messages to create some segments for(i <- 0 until 100) log.append(set) - + log.deleteOldSegments((s) => true) log.close() - - log = new Log(logDir, + + log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) } - + @Test def testAppendMessageWithNullPayload() { var log = new Log(logDir, @@ -585,9 +585,9 @@ class LogTest extends JUnitSuite { assertEquals(0, ms.head.offset) assertTrue("Message payload should be null.", ms.head.message.isNull) } - + @Test - def testCorruptLog() { + def testCorruptLog() { // append some messages to create some segments val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) @@ -607,15 +607,43 @@ class LogTest extends JUnitSuite { val filePosition = messages.searchFor(recoveryPoint, 0).position val indexPosition = index.lookup(recoveryPoint).position log.close() - + // corrupt file TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) - + // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) assertEquals(recoveryPoint, log.logEndOffset) } } - + + @Test + def testCleanShutdownFile() { + // append some messages to create some segments + val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) + val set = TestUtils.singleMessageSet("test".getBytes()) + val parentLogDir = logDir.getParentFile + assertTrue("Data directory %s must exist", parentLogDir.isDirectory) + val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile) + cleanShutdownFile.createNewFile() + assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) + var recoveryPoint = 50L + // create a log and write some messages to it + var log = new Log(logDir, + config, + recoveryPoint = 0L, + time.scheduler, + time) + for(i <- 0 until 100) + log.append(set) + log.close() + + // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the + // clean shutdown file exists. + recoveryPoint = log.logEndOffset + log = new Log(logDir, config, 0L, time.scheduler, time) + assertEquals(recoveryPoint, log.logEndOffset) + cleanShutdownFile.delete() + } }