commit 6204a60aefe065f4043bd2c114dec3013468157a Author: Jay Kreps Date: Wed Oct 3 13:32:27 2012 -0700 Address more feedback from Jun. diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index fa07c64..5249ddc 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -51,13 +51,13 @@ class PartitionTopicInfo(val topic: String, * Enqueue a message set for processing */ def enqueue(messages: ByteBufferMessageSet) { - val size = messages.validBytes + val size = messages.sizeInBytes if(size > 0) { val next = nextOffset(messages) - trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size) + trace("Updating fetch offset = " + fetchedOffset.get + " to " + next) chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) fetchedOffset.set(next) - debug("updated fetch offset of ( %s ) to %d".format(this, next)) + debug("updated fetch offset of (%s) to %d".format(this, next)) ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size) ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size) } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index e5c9e87..a1efe9e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -195,7 +195,7 @@ private[kafka] class Log(val dir: File, * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log. */ private def recoverSegment(segment: LogSegment) { - segment.index.truncateTo(0) + segment.index.truncate() var validBytes = 0 var lastIndexEntry = 0 val iter = segment.messageSet.iterator @@ -203,8 +203,10 @@ private[kafka] class Log(val dir: File, while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() - if(validBytes - lastIndexEntry > indexIntervalBytes) + if(validBytes - lastIndexEntry > indexIntervalBytes) { segment.index.append(entry.offset, validBytes) + lastIndexEntry = validBytes + } validBytes += MessageSet.entrySize(entry.message) } } catch { @@ -228,10 +230,8 @@ private[kafka] class Log(val dir: File, def close() { debug("Closing log " + name) lock synchronized { - for(seg <- segments.view) { - debug("Closing log segment " + seg.start) - seg.messageSet.close() - } + for(seg <- segments.view) + seg.close() } } @@ -547,11 +547,7 @@ private[kafka] class Log(val dir: File, def truncateAndStartWithNewOffset(newOffset: Long) { lock synchronized { val deletedSegments = segments.trunc(segments.view.size) - val logFile = logFilename(dir, newOffset) - val indexFile = indexFilename(dir, newOffset) debug("Truncate and start log '" + name + "' to " + newOffset) - val log = new FileMessageSet(file = logFile, mutable = true) - val index = new OffsetIndex(file = indexFile, baseOffset = newOffset, mutable = true, maxIndexSize = maxIndexSize) segments.append(new LogSegment(dir, newOffset, mutable = true, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index d9e884c..aaf03e8 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -147,4 +147,5 @@ class LogSegment(val messageSet: FileMessageSet, Utils.swallow(index.close) Utils.swallow(messageSet.close) } + } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 93488a7..aa5af1d 100644 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -54,18 +54,26 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma /* the memory mapping */ private var mmap: MappedByteBuffer = { + val newlyCreated = file.createNewFile() val raf = new RandomAccessFile(file, "rw") try { if(mutable) { /* if mutable create and memory map a new sparse file */ if(maxIndexSize < 8) throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) - file.createNewFile() - /* round size to the nearest multiple of 8 */ - val size = 8*(maxIndexSize/8) - raf.setLength(size) + + /* pre-allocate the file if necessary */ + if(newlyCreated) + raf.setLength(roundToExactMultiple(maxIndexSize, 8)) val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()) - idx.position(0).asInstanceOf[MappedByteBuffer] + + /* set the position in the index for the next entry */ + if(newlyCreated) + idx.position(0) + else + // if this is a pre-existing index, assume it is all valid and set position to last entry + idx.position(roundToExactMultiple(idx.limit, 8)) + idx } else { /* if not mutable, just mmap what they gave us */ val len = raf.length() @@ -83,10 +91,10 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma val maxEntries = mmap.limit / 8 /* the number of entries in the index */ - private var size = if(mutable) new AtomicInteger(0) else new AtomicInteger(mmap.limit / 8) + private var size = if(mutable) new AtomicInteger(mmap.position / 8) else new AtomicInteger(mmap.limit / 8) /* the last offset in the index */ - var lastOffset = if(mutable) baseOffset else readLastOffset() + var lastOffset = readLastOffset() /** * The last logical offset written to the index @@ -187,19 +195,34 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma def isFull: Boolean = entries >= this.maxEntries /** - * Remove all entries from the index which have an offset greater than or equal to the given offset + * Truncate the entire index + */ + def truncate() = truncateTo(this.baseOffset) + + /** + * Remove all entries from the index which have an offset greater than or equal to the given offset. + * Truncating to an offset larger than the largest in the index has no effect. */ def truncateTo(offset: Long) { this synchronized { val idx = mmap.duplicate val slot = indexSlotFor(idx, offset) - if(slot > 0) { - val found = logical(idx, slot) - val newEntries = if(found == offset) slot else slot + 1 - this.size.set(newEntries) - mmap.position(this.size.get * 8) - this.lastOffset = readLastOffset - } + + /* There are 3 cases for choosing the new size + * 1) if there is no entry in the index <= the offset, delete everything + * 2) if there is an entry for this exact offset, delete it and everything larger than it + * 3) if there is no entry for this offset, delete everything larger than the next smallest + */ + val newEntries = + if(slot < 0) + 0 + else if(logical(idx, slot) == offset) + slot + else + slot + 1 + this.size.set(newEntries) + mmap.position(this.size.get * 8) + this.lastOffset = readLastOffset } } @@ -245,5 +268,10 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma if(mutable) makeReadOnly() } - + + /** + * Round a number to the greatest exact multiple of the given factor less than the given number. + * E.g. roundToExactMultiple(67, 8) == 64 + */ + private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor) } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/SegmentList.scala b/core/src/main/scala/kafka/log/SegmentList.scala index d7c744a..5c7b0bd 100644 --- a/core/src/main/scala/kafka/log/SegmentList.scala +++ b/core/src/main/scala/kafka/log/SegmentList.scala @@ -95,6 +95,6 @@ private[log] class SegmentList[T](seq: Seq[T])(implicit m: ClassManifest[T]) { /** * Nicer toString method */ - override def toString(): String = view.toString + override def toString(): String = "SegmentList(%s)".format(view.mkString(", ")) } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 529de9c..7396a99 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -58,8 +58,10 @@ object DumpLogSegments { val startOffset = file.getName().split("\\.")(0).toLong println("Starting offset: " + startOffset) val messageSet = new FileMessageSet(file, false) + var validBytes = 0L for(messageAndOffset <- messageSet) { val msg = messageAndOffset.message + validBytes += MessageSet.entrySize(msg) print("offset: " + messageAndOffset.offset + " isvalid: " + msg.isValid + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) @@ -72,6 +74,9 @@ object DumpLogSegments { } println() } + val trailingBytes = messageSet.sizeInBytes - validBytes + if(trailingBytes > 0) + println("Found %d invalid bytes at the end of %s".format(trailingBytes, file.getName)) } } diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 6383a90..4a13f0d 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -17,6 +17,7 @@ class LogSegmentTest extends JUnit3Suite { val msFile = TestUtils.tempFile() val ms = new FileMessageSet(msFile, true) val idxFile = TestUtils.tempFile() + idxFile.delete() val idx = new OffsetIndex(idxFile, offset, true, 100) val seg = new LogSegment(ms, idx, offset, 10, SystemTime) segments += seg diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index ba367ca..48c10c1 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -265,20 +265,29 @@ class LogTest extends JUnitSuite { @Test def testLogRecoversToCorrectOffset() { val numMessages = 100 - var log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val messageSize = 100 + val segmentSize = 7 * messageSize + val indexInterval = 3 * messageSize + var log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) for(i <- 0 until numMessages) - log.append(TestUtils.singleMessageSet(i.toString.getBytes)) + log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) + val lastIndexOffset = log.segments.view.last.index.lastOffset + val numIndexEntries = log.segments.view.last.index.entries log.close() // test non-recovery case - log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) + assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) + assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) log.close() // test - log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, time = time) + log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) + assertEquals("Should have same last index offset as before.", lastIndexOffset, log.segments.view.last.index.lastOffset) + assertEquals("Should have same number of index entries as before.", numIndexEntries, log.segments.view.last.index.entries) log.close() } diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 9266afb..6def192 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -26,20 +26,6 @@ import scala.collection._ import scala.util.Random import kafka.utils._ - /* - * Test cases: - * - empty index - * - first value - * - last value - * - non-present value - * - present value - * - random values - * - test immutability - * - test truncate - * - test lookup outside bounds - * - Extreme values in append - * - what value do we return if falls off the end? - */ class OffsetIndexTest extends JUnitSuite { var idx: OffsetIndex = null @@ -47,7 +33,7 @@ class OffsetIndexTest extends JUnitSuite { @Before def setup() { - this.idx = new OffsetIndex(file = TestUtils.tempFile(), baseOffset = 45L, mutable = true, maxIndexSize = 30 * 8) + this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, mutable = true, maxIndexSize = 30 * 8) } @After @@ -143,12 +129,22 @@ class OffsetIndexTest extends JUnitSuite { @Test def truncate() { - val idx = new OffsetIndex(file = TestUtils.tempFile(), baseOffset = 0L, mutable = true, maxIndexSize = 10 * 8) + val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, mutable = true, maxIndexSize = 10 * 8) for(i <- 1 until 10) idx.append(i, i) + + idx.truncateTo(12) + assertEquals("Index should be unchanged by truncate past the end", OffsetPosition(9, 9), idx.lookup(10)) + idx.truncateTo(10) + assertEquals("Index should be unchanged by truncate at the end", OffsetPosition(9, 9), idx.lookup(10)) + idx.truncateTo(9) + assertEquals("Index should truncate off last entry", OffsetPosition(8, 8), idx.lookup(10)) idx.truncateTo(5) assertEquals("4 should be the last entry in the index", OffsetPosition(4, 4), idx.lookup(10)) assertEquals("4 should be the last entry in the index", 4, idx.lastOffset) + + idx.truncate() + assertEquals("Full truncation should leave no entries", 0, idx.entries()) } def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, klass: Class[T]) { @@ -161,7 +157,7 @@ class OffsetIndexTest extends JUnitSuite { } def makeIndex(baseOffset: Long, mutable: Boolean, vals: Seq[(Long, Int)]): OffsetIndex = { - val idx = new OffsetIndex(file = TestUtils.tempFile, baseOffset = baseOffset, mutable = mutable, maxIndexSize = 2 * vals.size * 8) + val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = baseOffset, mutable = mutable, maxIndexSize = 2 * vals.size * 8) for ((logical, physical) <- vals) idx.append(logical, physical) idx @@ -177,4 +173,10 @@ class OffsetIndexTest extends JUnitSuite { } vals } + + def nonExistantTempFile(): File = { + val file = TestUtils.tempFile() + file.delete() + file + } } \ No newline at end of file