diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0cc402b..a0e1b11 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -165,6 +165,12 @@ class Log(val dir: File, private def recoverLog() { val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} + val cleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile) + val needsRecovery = !cleanShutdownFile.exists() + if(!needsRecovery) { + this.recoveryPoint = lastOffset + return + } if(lastOffset <= this.recoveryPoint) { info("Log '%s' is fully intact, skipping recovery".format(name)) this.recoveryPoint = lastOffset @@ -697,6 +703,11 @@ object Log { /** A temporary file used when swapping files into the log */ val SwapFileSuffix = ".swap" + /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility + * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */ + /** TODO: Get rid of CleanShutdownFile in 0.8.2 */ + val CleanShutdownFile = ".kafka_cleanshutdown" + /** * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros * so that ls sorts the files numerically. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d489e08..390b759 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -109,8 +109,11 @@ class LogManager(val logDirs: Array[File], /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { + val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) + if(cleanShutDownFile.exists()) + info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath)) for(dir <- subDirs) { - if(dir.isDirectory){ + if(dir.isDirectory) { info("Loading log '" + dir.getName + "'") val topicPartition = parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) @@ -124,6 +127,7 @@ class LogManager(val logDirs: Array[File], throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } + cleanShutDownFile.delete() } } } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 140317c..0b516f9 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -617,5 +617,33 @@ class LogTest extends JUnitSuite { assertEquals(recoveryPoint, log.logEndOffset) } } - + + @Test + def testCleanShutdownFile() { + // append some messages to create some segments + val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) + val set = TestUtils.singleMessageSet("test".getBytes()) + val parentLogDir = logDir.getParentFile + assertTrue("Data directory %s must exist", parentLogDir.isDirectory) + val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile) + cleanShutdownFile.createNewFile() + assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists()) + var recoveryPoint = 50L + // create a log and write some messages to it + var log = new Log(logDir, + config, + recoveryPoint = 0L, + time.scheduler, + time) + for(i <- 0 until 100) + log.append(set) + log.close() + + // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the + // clean shutdown file exists. + recoveryPoint = log.logEndOffset + log = new Log(logDir, config, 0L, time.scheduler, time) + assertEquals(recoveryPoint, log.logEndOffset) + cleanShutdownFile.delete() + } }