From 2d0ea5975ec1b702fc31220c78cb67e9d892a832 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Thu, 14 Nov 2013 12:37:34 -0800 Subject: [PATCH] KAFKA-1112 Recovery not occuring on corrupt indexes. Conflicts: core/src/main/scala/kafka/log/Log.scala --- core/src/main/scala/kafka/log/Log.scala | 26 +++++++------- core/src/main/scala/kafka/log/LogSegment.scala | 21 ++++++++++++ core/src/main/scala/kafka/log/OffsetIndex.scala | 40 +++++++++++++--------- core/src/test/scala/unit/kafka/log/LogTest.scala | 23 +++++++------ .../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++-- 5 files changed, 79 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 9205128..5f60bcb 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -155,26 +155,23 @@ class Log(val dir: File, activeSegment.index.resize(config.maxIndexSize) } - // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset. - for (s <- logSegments) { - require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, - "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) - } + // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment + for (s <- logSegments) + s.index.sanityCheck() } private def recoverLog() { - val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} - val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists() - if(!needsRecovery) { - this.recoveryPoint = lastOffset + // if we are doing an upgrade for the first time and have the clean shutdown marker, skip recovery + if(hasLegacyCleanShutdownFile) { + this.recoveryPoint = activeSegment.nextOffset return } - if(lastOffset <= this.recoveryPoint) { + // otherwise check the last segment and see if it ends at the recovery point + if(!activeSegment.needsRecovery(this.recoveryPoint)) { info("Log '%s' is fully intact, skipping recovery".format(name)) - this.recoveryPoint = lastOffset return } + // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next @@ -196,6 +193,11 @@ class Log(val dir: File, } } } + + /** + * Check if we have the legacy "clean shutdown" file from 0.8.0 + */ + private def hasLegacyCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..739726d 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -213,6 +213,26 @@ class LogSegment(val log: FileMessageSet, } /** + * Do a quick check whether we need to run recovery. If the recovery point is <= the last offset in the log + * then we allow recovery to be skipped. We have to be carefully because the index could be arbitrarily corrupt. + */ + def needsRecovery(recoveryPoint: Long): Boolean = { + val entry = index.readLastEntry + if(entry.offset < 0 || entry.position < 0) { + // a negative offset implies certain corruption in the index + true + } else if(entry.offset - this.baseOffset == 0 && index.entries > 0) { + // a zero entry in the index, unless it is the first entry, is invalid + true + } else { + // what we got from the index may still be corrupt, but if it leads us to a final offset + // that is exactly before the recovery point then the full log was sync'd and we exit out + val op = log.searchFor(recoveryPoint-1, entry.position) + op == null || op.offset != recoveryPoint-1 + } + } + + /** * Calculate the offset that would be used for the next message to be append to this segment. * Note that this is expensive. */ @@ -285,4 +305,5 @@ class LogSegment(val log: FileMessageSet, log.file.setLastModified(ms) index.file.setLastModified(ms) } + } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 2f4e303..8a62dfa 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -69,12 +69,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi raf.setLength(roundToExactMultiple(maxIndexSize, 8)) } - val len = raf.length() - if(len < 0 || len % 8 != 0) - throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + - " bytes which is not positive or not a multiple of 8.") - /* memory-map the file */ + val len = raf.length() val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) /* set the position in the index for the next entry */ @@ -99,22 +95,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi var maxEntries = mmap.limit / 8 /* the last offset in the index */ - var lastOffset = readLastOffset() + var lastOffset = readLastEntry.offset debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) /** - * The last offset written to the index + * The last entry in the index */ - private def readLastOffset(): Long = { + def readLastEntry(): OffsetPosition = { inLock(lock) { - val offset = - size.get match { - case 0 => 0 - case s => relativeOffset(this.mmap, s-1) - } - baseOffset + offset + size.get match { + case 0 => OffsetPosition(baseOffset, 0) + case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1)) + } } } @@ -179,7 +173,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* return the nth offset relative to the base offset */ private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) - /* return the nth physical offset */ + /* return the nth physical position */ private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) /** @@ -258,7 +252,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi inLock(lock) { this.size.set(entries) mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + this.lastOffset = readLastEntry.offset } } @@ -351,6 +345,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** + * Do a basic sanity check on this index to detect obvious problems + * @throw IllegalArgumentException if any problems are found + */ + def sanityCheck() { + require(entries == 0 || lastOffset > baseOffset, + "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" + .format(file.getAbsolutePath, lastOffset, baseOffset)) + val len = file.length() + require(len % 8 == 0, + "Index file " + file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") + } + + /** * Round a number to the greatest exact multiple of the given factor less than the given number. * E.g. roundToExactMultiple(67, 8) == 64 */ diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1571f1e..0ad85fa 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -592,29 +592,30 @@ class LogTest extends JUnitSuite { val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) val recoveryPoint = 50L - for(iteration <- 0 until 10) { + for(iteration <- 0 until 50) { + println("Iteration " + iteration) // create a log and write some messages to it + logDir.mkdirs() var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - for(i <- 0 until 100) + val numMessages = 50 + TestUtils.random.nextInt(50) + for(i <- 0 until numMessages) log.append(set) - val seg = log.logSegments(0, recoveryPoint).last - val index = seg.index - val messages = seg.log - val filePosition = messages.searchFor(recoveryPoint, 0).position - val indexPosition = index.lookup(recoveryPoint).position + val messages = log.logSegments.flatMap(_.log.iterator.toList) log.close() - // corrupt file - TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) - TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) + // corrupt index and log by appending random bytes + TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1) + TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) - assertEquals(recoveryPoint, log.logEndOffset) + assertEquals(numMessages, log.logEndOffset) + assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) + Utils.rm(logDir) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 777b315..d88b6c3 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -518,9 +518,15 @@ object TestUtils extends Logging { def writeNonsenseToFile(fileName: File, position: Long, size: Int) { val file = new RandomAccessFile(fileName, "rw") file.seek(position) - val rand = new Random for(i <- 0 until size) - file.writeByte(rand.nextInt(255)) + file.writeByte(random.nextInt(255)) + file.close() + } + + def appendNonsenseToFile(fileName: File, size: Int) { + val file = new FileOutputStream(fileName, true) + for(i <- 0 until size) + file.write(random.nextInt(255)) file.close() } -- 1.7.12.4 (Apple Git-37)