From 540dff97f2a7f1b1e04b64d0fcb44dda8e3e2011 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Thu, 14 Nov 2013 12:37:34 -0800 Subject: [PATCH] KAFKA-1112 Recovery not occuring on corrupt indexes. --- core/src/main/scala/kafka/log/FileMessageSet.scala | 2 + core/src/main/scala/kafka/log/Log.scala | 26 ++++++------ core/src/main/scala/kafka/log/LogSegment.scala | 46 ++++++++++++++++++++++ core/src/main/scala/kafka/log/OffsetIndex.scala | 40 +++++++++++-------- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 6 +-- core/src/test/scala/unit/kafka/log/LogTest.scala | 22 +++++------ .../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++- 7 files changed, 108 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index 6c099da..e1f8b97 100644 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -123,6 +123,8 @@ class FileMessageSet private[kafka](@volatile var file: File, if(offset >= targetOffset) return OffsetPosition(offset, position) val messageSize = buffer.getInt() + if(messageSize < Message.MessageOverhead) + throw new IllegalStateException("Invalid message size: " + messageSize) position += MessageSet.LogOverhead + messageSize } null diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 9205128..5f60bcb 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -155,26 +155,23 @@ class Log(val dir: File, activeSegment.index.resize(config.maxIndexSize) } - // sanity check the index file of every segment, if it's empty or its last offset is greater than its base offset. - for (s <- logSegments) { - require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, - "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" - .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset)) - } + // sanity check the index file of every segment to ensure we don't proceed with a corrupt segment + for (s <- logSegments) + s.index.sanityCheck() } private def recoverLog() { - val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L} - val needsRecovery = !(new File(dir.getParentFile, CleanShutdownFile)).exists() - if(!needsRecovery) { - this.recoveryPoint = lastOffset + // if we are doing an upgrade for the first time and have the clean shutdown marker, skip recovery + if(hasLegacyCleanShutdownFile) { + this.recoveryPoint = activeSegment.nextOffset return } - if(lastOffset <= this.recoveryPoint) { + // otherwise check the last segment and see if it ends at the recovery point + if(!activeSegment.needsRecovery(this.recoveryPoint)) { info("Log '%s' is fully intact, skipping recovery".format(name)) - this.recoveryPoint = lastOffset return } + // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator while(unflushed.hasNext) { val curr = unflushed.next @@ -196,6 +193,11 @@ class Log(val dir: File, } } } + + /** + * Check if we have the legacy "clean shutdown" file from 0.8.0 + */ + private def hasLegacyCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists() /** * The number of segments in the log. diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 0d6926e..cb56555 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -213,6 +213,51 @@ class LogSegment(val log: FileMessageSet, } /** + * Do a quick check whether we need to run recovery. If the recovery point is <= the last offset in the log + * then we allow recovery to be skipped. We have to be careful because the index could be arbitrarily corrupt. + */ + def needsRecovery(recoveryPoint: Long): Boolean = { + val entry = index.readLastEntry + if(entry.offset < 0 || entry.position < 0) { + // a negative offset implies certain corruption in the index + true + } else if(entry.offset - this.baseOffset == 0 && index.entries > 0) { + // a zero entry in the index, unless it is the first entry, is invalid + true + } else { + // the position we got from the index may still be corrupt, proceed with caution... + val op = + try { + log.searchFor(recoveryPoint-1, entry.position) + } catch { + // the search can fail in a corrupt log + case e: Exception => return true + } + if(op == null || op.offset != recoveryPoint-1) + return true + + // okay we have found what appears to be the offset we were looking for. Now let's validate that it is a valid message + // and is the final message in the log + val messages = log.read(op.position, log.sizeInBytes - op.position) + val iter = messages.iterator + // if there is no message there, we are corrupt + if(!iter.hasNext) + return true + + val last = iter.next + // if there are any trailing bytes we are corrupt + if(MessageSet.entrySize(last.message) != messages.sizeInBytes) + return true + + // if the crc check of the message fails we are corrupt + if(!last.message.isValid) + return true + + false + } + } + + /** * Calculate the offset that would be used for the next message to be append to this segment. * Note that this is expensive. */ @@ -285,4 +330,5 @@ class LogSegment(val log: FileMessageSet, log.file.setLastModified(ms) index.file.setLastModified(ms) } + } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 2f4e303..8a62dfa 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -69,12 +69,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi raf.setLength(roundToExactMultiple(maxIndexSize, 8)) } - val len = raf.length() - if(len < 0 || len % 8 != 0) - throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + - " bytes which is not positive or not a multiple of 8.") - /* memory-map the file */ + val len = raf.length() val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) /* set the position in the index for the next entry */ @@ -99,22 +95,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi var maxEntries = mmap.limit / 8 /* the last offset in the index */ - var lastOffset = readLastOffset() + var lastOffset = readLastEntry.offset debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) /** - * The last offset written to the index + * The last entry in the index */ - private def readLastOffset(): Long = { + def readLastEntry(): OffsetPosition = { inLock(lock) { - val offset = - size.get match { - case 0 => 0 - case s => relativeOffset(this.mmap, s-1) - } - baseOffset + offset + size.get match { + case 0 => OffsetPosition(baseOffset, 0) + case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1)) + } } } @@ -179,7 +173,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /* return the nth offset relative to the base offset */ private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) - /* return the nth physical offset */ + /* return the nth physical position */ private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) /** @@ -258,7 +252,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi inLock(lock) { this.size.set(entries) mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset + this.lastOffset = readLastEntry.offset } } @@ -351,6 +345,20 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi } /** + * Do a basic sanity check on this index to detect obvious problems + * @throw IllegalArgumentException if any problems are found + */ + def sanityCheck() { + require(entries == 0 || lastOffset > baseOffset, + "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d" + .format(file.getAbsolutePath, lastOffset, baseOffset)) + val len = file.length() + require(len % 8 == 0, + "Index file " + file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") + } + + /** * Round a number to the greatest exact multiple of the given factor less than the given number. * E.g. roundToExactMultiple(67, 8) == 64 */ diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 5f2c2e8..03c3eea 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -212,16 +212,16 @@ class LogSegmentTest extends JUnit3Suite { */ @Test def testRecoveryWithCorruptMessage() { - val rand = new Random(1) val messagesAppended = 20 for(iteration <- 0 until 10) { val seg = createSegment(0) for(i <- 0 until messagesAppended) seg.append(i, messages(i, i.toString)) - val offsetToBeginCorruption = rand.nextInt(messagesAppended) + val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended) // start corrupting somewhere in the middle of the chosen record all the way to the end - val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + rand.nextInt(15) + val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15) TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position) + assertTrue(seg.needsRecovery(messagesAppended)) seg.recover(64*1024) assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList) seg.delete() diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1571f1e..1da1393 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -592,29 +592,29 @@ class LogTest extends JUnitSuite { val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) val set = TestUtils.singleMessageSet("test".getBytes()) val recoveryPoint = 50L - for(iteration <- 0 until 10) { + for(iteration <- 0 until 50) { // create a log and write some messages to it + logDir.mkdirs() var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) - for(i <- 0 until 100) + val numMessages = 50 + TestUtils.random.nextInt(50) + for(i <- 0 until numMessages) log.append(set) - val seg = log.logSegments(0, recoveryPoint).last - val index = seg.index - val messages = seg.log - val filePosition = messages.searchFor(recoveryPoint, 0).position - val indexPosition = index.lookup(recoveryPoint).position + val messages = log.logSegments.flatMap(_.log.iterator.toList) log.close() - // corrupt file - TestUtils.writeNonsenseToFile(index.file, indexPosition, index.file.length.toInt - indexPosition) - TestUtils.writeNonsenseToFile(messages.file, filePosition, messages.file.length().toInt - filePosition) + // corrupt index and log by appending random bytes + TestUtils.appendNonsenseToFile(log.activeSegment.index.file, TestUtils.random.nextInt(1024) + 1) + TestUtils.appendNonsenseToFile(log.activeSegment.log.file, TestUtils.random.nextInt(1024) + 1) // attempt recovery log = new Log(logDir, config, recoveryPoint, time.scheduler, time) - assertEquals(recoveryPoint, log.logEndOffset) + assertEquals(numMessages, log.logEndOffset) + assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList)) + Utils.rm(logDir) } } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 777b315..d88b6c3 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -518,9 +518,15 @@ object TestUtils extends Logging { def writeNonsenseToFile(fileName: File, position: Long, size: Int) { val file = new RandomAccessFile(fileName, "rw") file.seek(position) - val rand = new Random for(i <- 0 until size) - file.writeByte(rand.nextInt(255)) + file.writeByte(random.nextInt(255)) + file.close() + } + + def appendNonsenseToFile(fileName: File, size: Int) { + val file = new FileOutputStream(fileName, true) + for(i <- 0 until size) + file.write(random.nextInt(255)) file.close() } -- 1.7.12.4 (Apple Git-37)