diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5cd36e0..eee0ed3 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -126,6 +126,8 @@ private[kafka] class Log(val dir: File, /* Calculate the offset of the next message */ private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset()) + + debug("Completed load of log %s with log end offset %d".format(name, logEndOffset)) newGauge(name + "-" + "NumLogSegments", new Gauge[Int] { def getValue = numberOfSegments }) @@ -275,8 +277,10 @@ private[kafka] class Log(val dir: File, val lastOffset = offsetCounter.get - 1 (firstOffset, lastOffset) } else { - if(!messageSetInfo.offsetsMonotonic) - throw new IllegalArgumentException("Out of order offsets found in " + messages) + require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages) + require(messageSetInfo.firstOffset >= nextOffset.get, + "Attempt to append a message set beginning with offset %d to a log with log end offset %d." + .format(messageSetInfo.firstOffset, nextOffset.get)) (messageSetInfo.firstOffset, messageSetInfo.lastOffset) } diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 43b3575..0d67242 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -90,8 +90,9 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = /* the last offset in the index */ var lastOffset = readLastOffset() - info("Created index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d" - .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset)) + info("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" + .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position)) + require(entries == 0 || lastOffset > this.baseOffset, "Corrupt index found, index file (%s) has non-zero size but last offset is %d.".format(file.getAbsolutePath, lastOffset)) /* the maximum number of entries this index can hold */ def maxEntries = mmap.limit / 8 @@ -177,15 +178,14 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = */ def append(offset: Long, position: Int) { this synchronized { - if(isFull) - throw new IllegalStateException("Attempt to append to a full index (size = " + size + ").") - if(size.get > 0 && offset <= lastOffset) - throw new IllegalArgumentException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d).".format(offset, entries, lastOffset)) + require(!isFull, "Attempt to append to a full index (size = " + size + ").") + require(size.get == 0 || offset > lastOffset, "Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d).".format(offset, entries, lastOffset)) debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) this.mmap.putInt((offset - baseOffset).toInt) this.mmap.putInt(position) this.size.incrementAndGet() this.lastOffset = offset + require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } } @@ -230,7 +230,11 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from * the file. */ - def trimToValidSize() = resize(entries * 8) + def trimToValidSize() { + this synchronized { + resize(entries * 8) + } + } /** * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in @@ -245,7 +249,9 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int = val roundedNewSize = roundToExactMultiple(newSize, 8) try { raf.setLength(roundedNewSize) + val position = this.mmap.position this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) + this.mmap.position(position) } finally { Utils.swallow(raf.close()) } diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 952a40b..051ebe3 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -86,7 +86,7 @@ class OffsetIndexTest extends JUnitSuite { val offset = idx.baseOffset + i + 1 idx.append(offset, i) } - assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException]) + assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalArgumentException]) } @Test(expected = classOf[IllegalArgumentException]) @@ -105,7 +105,9 @@ class OffsetIndexTest extends JUnitSuite { val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset) assertEquals(first, idxRo.lookup(first.offset)) assertEquals(sec, idxRo.lookup(sec.offset)) - assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException]) + assertEquals(sec.offset, idxRo.lastOffset) + assertEquals(2, idxRo.entries) + assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalArgumentException]) } @Test