diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index eee0ed3..93b363e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -192,7 +192,13 @@ private[kafka] class Log(val dir: File, if(needsRecovery) recoverSegment(logSegments.get(logSegments.size - 1)) } - new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size))) + + val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size)) + segmentList.foreach(s => require(s.index.entries == 0 || s.index.lastOffset > s.index.baseOffset, + "Corrupt index found, index file (%s) has non-zero size but last offset is %d and the base offset is %d" + .format(s.index.file.getAbsolutePath, s.index.lastOffset, s.index.baseOffset))) + + new SegmentList(segmentList) } /** diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 0d67242..36a1080 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -92,7 +92,6 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) - require(entries == 0 || lastOffset > this.baseOffset, "Corrupt index found, index file (%s) has non-zero size but last offset is %d.".format(file.getAbsolutePath, lastOffset)) /* the maximum number of entries this index can hold */ def maxEntries = mmap.limit / 8 @@ -134,7 +133,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = val relOffset = targetOffset - baseOffset // check if the index is empty - if(entries == 0) + if(entries == 0 || lastOffset <= baseOffset) return -1 // check if the target offset is smaller than the least offset