diff --git core/src/main/scala/kafka/log/FileMessageSet.scala core/src/main/scala/kafka/log/FileMessageSet.scala index bc188d9..be60c24 100644 --- core/src/main/scala/kafka/log/FileMessageSet.scala +++ core/src/main/scala/kafka/log/FileMessageSet.scala @@ -31,45 +31,35 @@ import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} /** * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts * will fail on an immutable message set. An optional limit and start position can be applied to the message set - * which will control the position in the file at which the set begins + * which will control the position in the file at which the set begins. */ @nonthreadsafe class FileMessageSet private[kafka](val file: File, private[log] val channel: FileChannel, - private[log] val start: Long, // the starting position in the file - private[log] val limit: Long, // the length (may be less than the file length) - val mutable: Boolean) extends MessageSet with Logging { + private[log] val start: Long = 0L, + private[log] val limit: Long = Long.MaxValue) extends MessageSet with Logging { - private val setSize = new AtomicLong() - - if(mutable) { - if(limit < Long.MaxValue || start > 0) - throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.") - - setSize.set(channel.size()) - channel.position(channel.size) - } else { - setSize.set(scala.math.min(channel.size(), limit) - start) - } + /* the size of the message set in bytes */ + private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start) + + /* set the file position to the last byte in the file */ + channel.position(channel.size) /** * Create a file message set with no limit or offset */ - def this(file: File, channel: FileChannel, mutable: Boolean) = - this(file, channel, 0, Long.MaxValue, mutable) + def this(file: File, channel: FileChannel) = this(file, channel, 0, Long.MaxValue) /** * Create a file message set with no limit or offset */ - def this(file: File, mutable: Boolean) = - this(file, Utils.openChannel(file, mutable), mutable) + def this(file: File) = this(file, Utils.openChannel(file, mutable = true)) /** * Return a message set which is a view into this set starting from the given position and with the given size limit. */ def read(position: Long, size: Long): FileMessageSet = { - new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes()), - false) + new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes())) } /** @@ -79,7 +69,7 @@ class FileMessageSet private[kafka](val file: File, private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { var position = startingPosition val buffer = ByteBuffer.allocate(12) - val size = setSize.get() + val size = _size.get() while(position + 12 < size) { buffer.rewind() channel.read(buffer, position) @@ -138,29 +128,22 @@ class FileMessageSet private[kafka](val file: File, /** * The number of bytes taken up by this file set */ - def sizeInBytes(): Long = setSize.get() - - def checkMutable(): Unit = { - if(!mutable) - throw new KafkaException("Attempt to invoke mutation on immutable message set.") - } + def sizeInBytes(): Long = _size.get() /** * Append this message to the message set */ def append(messages: MessageSet): Unit = { - checkMutable() var written = 0L while(written < messages.sizeInBytes) written += messages.writeTo(channel, 0, messages.sizeInBytes) - setSize.getAndAdd(written) + _size.getAndAdd(written) } /** * Commit all written data to the physical disk */ def flush() = { - checkMutable() LogFlushStats.logFlushTimer.time { channel.force(true) } @@ -170,8 +153,7 @@ class FileMessageSet private[kafka](val file: File, * Close this message set */ def close() { - if(mutable) - flush() + flush() channel.close() } @@ -188,13 +170,12 @@ class FileMessageSet private[kafka](val file: File, * given size falls on a valid byte offset. */ def truncateTo(targetSize: Long) = { - checkMutable() if(targetSize > sizeInBytes()) throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) + " size of this log segment is only %d bytes".format(sizeInBytes())) channel.truncate(targetSize) channel.position(targetSize) - setSize.set(targetSize) + _size.set(targetSize) } } diff --git core/src/main/scala/kafka/log/Log.scala core/src/main/scala/kafka/log/Log.scala index 4279f86..e94e472 100644 --- core/src/main/scala/kafka/log/Log.scala +++ core/src/main/scala/kafka/log/Log.scala @@ -101,8 +101,8 @@ object Log { private[kafka] class Log(val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, - val flushInterval: Int, - val rollIntervalMs: Long, + val flushInterval: Int = Int.MaxValue, + val rollIntervalMs: Long = Long.MaxValue, val needsRecovery: Boolean, val maxIndexSize: Int = (10*1024*1024), val indexIntervalBytes: Int = 4096, @@ -151,8 +151,7 @@ private[kafka] class Log(val dir: File, if(!Log.indexFilename(dir, start).exists) throw new IllegalStateException("Found log file with no corresponding index file.") logSegments.add(new LogSegment(dir = dir, - startOffset = start, - mutable = false, + startOffset = start, indexIntervalBytes = indexIntervalBytes, maxIndexSize = maxIndexSize)) } @@ -161,8 +160,7 @@ private[kafka] class Log(val dir: File, if(logSegments.size == 0) { // no existing segments, create a new mutable segment logSegments.add(new LogSegment(dir = dir, - startOffset = 0, - mutable = true, + startOffset = 0, indexIntervalBytes = indexIntervalBytes, maxIndexSize = maxIndexSize)) } else { @@ -176,17 +174,9 @@ private[kafka] class Log(val dir: File, } }) - //make the final section mutable and run recovery on it if necessary - val last = logSegments.remove(logSegments.size - 1) - last.close() - val mutableSegment = new LogSegment(dir = dir, - startOffset = last.start, - mutable = true, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize) + // run recovery on the last segment if necessary if(needsRecovery) - recoverSegment(mutableSegment) - logSegments.add(mutableSegment) + recoverSegment(logSegments.get(logSegments.size - 1)) } new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size))) } @@ -406,12 +396,11 @@ private[kafka] class Log(val dir: File, } debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName) segments.view.lastOption match { - case Some(segment) => segment.index.makeReadOnly() + case Some(segment) => segment.index.trimToSize() case None => } val segment = new LogSegment(dir, startOffset = newOffset, - mutable = true, indexIntervalBytes = indexIntervalBytes, maxIndexSize = maxIndexSize) segments.append(segment) @@ -546,8 +535,7 @@ private[kafka] class Log(val dir: File, val deletedSegments = segments.trunc(segments.view.size) debug("Truncate and start log '" + name + "' to " + newOffset) segments.append(new LogSegment(dir, - newOffset, - mutable = true, + newOffset, indexIntervalBytes = indexIntervalBytes, maxIndexSize = maxIndexSize)) deleteSegments(deletedSegments) diff --git core/src/main/scala/kafka/log/LogSegment.scala core/src/main/scala/kafka/log/LogSegment.scala index aaf03e8..4bf3939 100644 --- core/src/main/scala/kafka/log/LogSegment.scala +++ core/src/main/scala/kafka/log/LogSegment.scala @@ -28,9 +28,9 @@ class LogSegment(val messageSet: FileMessageSet, @volatile var deleted = false - def this(dir: File, startOffset: Long, mutable: Boolean, indexIntervalBytes: Int, maxIndexSize: Int) = - this(new FileMessageSet(file = Log.logFilename(dir, startOffset), mutable = mutable), - new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, mutable = mutable, maxIndexSize = maxIndexSize), + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = + this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), + new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, SystemTime) diff --git core/src/main/scala/kafka/log/OffsetIndex.scala core/src/main/scala/kafka/log/OffsetIndex.scala index aa5af1d..e8eb554 100644 --- core/src/main/scala/kafka/log/OffsetIndex.scala +++ core/src/main/scala/kafka/log/OffsetIndex.scala @@ -49,39 +49,36 @@ import kafka.utils._ * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal * storage format. */ -class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, maxIndexSize: Int = -1) extends Logging { - +class OffsetIndex(val file: File, val baseOffset: Long, maxIndexSize: Int = -1) extends Logging { + /* the memory mapping */ private var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() val raf = new RandomAccessFile(file, "rw") try { - if(mutable) { - /* if mutable create and memory map a new sparse file */ + /* pre-allocate the file if necessary */ + if(newlyCreated) { if(maxIndexSize < 8) throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) + raf.setLength(roundToExactMultiple(maxIndexSize, 8)) + } - /* pre-allocate the file if necessary */ - if(newlyCreated) - raf.setLength(roundToExactMultiple(maxIndexSize, 8)) - val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, raf.length()) + val len = raf.length() + if(len < 0 || len % 8 != 0) + throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + + " bytes which is not positive or not a multiple of 8.") - /* set the position in the index for the next entry */ - if(newlyCreated) - idx.position(0) - else - // if this is a pre-existing index, assume it is all valid and set position to last entry - idx.position(roundToExactMultiple(idx.limit, 8)) - idx - } else { - /* if not mutable, just mmap what they gave us */ - val len = raf.length() - if(len < 0 || len % 8 != 0) - throw new IllegalStateException("Index file " + file.getName + " is corrupt, found " + len + - " bytes which is not positive or not a multiple of 8.") - raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len) - } + /* memory-map the file */ + val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) + + /* set the position in the index for the next entry */ + if(newlyCreated) + idx.position(0) + else + // if this is a pre-existing index, assume it is all valid and set position to last entry + idx.position(roundToExactMultiple(idx.limit, 8)) + idx } finally { Utils.swallow(raf.close()) } @@ -91,7 +88,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma val maxEntries = mmap.limit / 8 /* the number of entries in the index */ - private var size = if(mutable) new AtomicInteger(mmap.position / 8) else new AtomicInteger(mmap.limit / 8) + private var size = new AtomicInteger(mmap.position / 8) /* the last offset in the index */ var lastOffset = readLastOffset() @@ -115,8 +112,6 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma * the pair (baseOffset, 0) is returned. */ def lookup(targetOffset: Long): OffsetPosition = { - if(entries == 0) - return OffsetPosition(baseOffset, 0) val idx = mmap.duplicate val slot = indexSlotFor(idx, targetOffset) if(slot == -1) @@ -128,16 +123,20 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma /** * Find the slot in which the largest offset less than or equal to the given * target offset is stored. - * Return -1 if the least entry in the index is larger than the target offset + * Return -1 if the least entry in the index is larger than the target offset or the index is empty */ private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = { // we only store the difference from the baseoffset so calculate that val relativeOffset = targetOffset - baseOffset + // check if the index is empty + if(entries == 0) + return -1 + // check if the target offset is smaller than the least offset if(logical(idx, 0) > relativeOffset) return -1 - + // binary search for the entry var lo = 0 var hi = entries-1 @@ -175,8 +174,6 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma */ def append(logicalOffset: Long, position: Int) { this synchronized { - if(!mutable) - throw new IllegalStateException("Attempt to append to an immutable offset index " + file.getName) if(isFull) throw new IllegalStateException("Attempt to append to a full index (size = " + size + ").") if(size.get > 0 && logicalOffset <= lastOffset) @@ -227,17 +224,17 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma } /** - * Make this segment read-only, flush any unsaved changes, and truncate any excess bytes + * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from + * the file. */ - def makeReadOnly() { + def trimToSize() { this synchronized { - mutable = false flush() val raf = new RandomAccessFile(file, "rws") try { val newLength = entries * 8 raf.setLength(newLength) - this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, newLength) + this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, newLength) } finally { Utils.swallow(raf.close()) } @@ -265,8 +262,7 @@ class OffsetIndex(val file: File, val baseOffset: Long, var mutable: Boolean, ma /** Close the index */ def close() { - if(mutable) - makeReadOnly() + trimToSize() } /** diff --git core/src/main/scala/kafka/tools/DumpLogSegments.scala core/src/main/scala/kafka/tools/DumpLogSegments.scala index 7396a99..947aff3 100644 --- core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -43,7 +43,7 @@ object DumpLogSegments { /* print out the contents of the index */ def dumpIndex(file: File) { val startOffset = file.getName().split("\\.")(0).toLong - val index = new OffsetIndex(file = file, baseOffset = startOffset, mutable = false) + val index = new OffsetIndex(file = file, baseOffset = startOffset) for(i <- 0 until index.entries) { val entry = index.entry(i) // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one @@ -57,7 +57,7 @@ object DumpLogSegments { def dumpLog(file: File, printContents: Boolean) { val startOffset = file.getName().split("\\.")(0).toLong println("Starting offset: " + startOffset) - val messageSet = new FileMessageSet(file, false) + val messageSet = new FileMessageSet(file) var validBytes = 0L for(messageAndOffset <- messageSet) { val msg = messageAndOffset.message diff --git core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala index f06e537..d0044cf 100644 --- core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala +++ core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala @@ -29,7 +29,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases { val messageSet = createMessageSet(messages) def createMessageSet(messages: Seq[Message]): FileMessageSet = { - val set = new FileMessageSet(tempFile(), true) + val set = new FileMessageSet(tempFile()) set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*)) set.flush() set diff --git core/src/test/scala/unit/kafka/log/LogSegmentTest.scala core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 4a13f0d..8e57514 100644 --- core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -15,10 +15,10 @@ class LogSegmentTest extends JUnit3Suite { def createSegment(offset: Long): LogSegment = { val msFile = TestUtils.tempFile() - val ms = new FileMessageSet(msFile, true) + val ms = new FileMessageSet(msFile) val idxFile = TestUtils.tempFile() idxFile.delete() - val idx = new OffsetIndex(idxFile, offset, true, 100) + val idx = new OffsetIndex(idxFile, offset, 100) val seg = new LogSegment(ms, idx, offset, 10, SystemTime) segments += seg seg diff --git core/src/test/scala/unit/kafka/log/LogTest.scala core/src/test/scala/unit/kafka/log/LogTest.scala index 48c10c1..2f49139 100644 --- core/src/test/scala/unit/kafka/log/LogTest.scala +++ core/src/test/scala/unit/kafka/log/LogTest.scala @@ -342,6 +342,33 @@ class LogTest extends JUnitSuite { assertEquals("Should change log size", log.size, 0) } + @Test + def testReopenThenTruncate() { + val set = TestUtils.singleMessageSet("test".getBytes()) + + // create a log + var log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 10000, + needsRecovery = true) + + // add enough messages to roll over several segments then close and re-open and attempt to truncate + for(i <- 0 until 100) + log.append(set) + log.close() + log = new Log(logDir, + maxLogFileSize = set.sizeInBytes * 5, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 10000, + needsRecovery = true) + log.truncateTo(3) + assertEquals("All but one segment should be deleted.", 1, log.numberOfSegments) + assertEquals("Log end offset should be 3.", 3, log.logEndOffset) + } + def assertContains(ranges: Array[Range], offset: Long) = { Log.findRange(ranges, offset) match { case Some(range) => diff --git core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 6def192..91c9881 100644 --- core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -33,7 +33,7 @@ class OffsetIndexTest extends JUnitSuite { @Before def setup() { - this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, mutable = true, maxIndexSize = 30 * 8) + this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8) } @After @@ -41,7 +41,7 @@ class OffsetIndexTest extends JUnitSuite { if(this.idx != null) this.idx.file.delete() } - + @Test def randomLookupTest() { assertEquals("Not present value should return physical offset 0.", OffsetPosition(idx.baseOffset, 0), idx.lookup(92L)) @@ -88,25 +88,6 @@ class OffsetIndexTest extends JUnitSuite { } assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalStateException]) } - - - @Test - def testReadOnly() { - /* add some random values */ - val vals = List((49, 1), (52, 2), (55, 3)) - for((logical, physical) <- vals) - idx.append(logical, physical) - - idx.makeReadOnly() - - assertEquals("File length should just contain added entries.", vals.size * 8L, idx.file.length()) - assertEquals("Last offset field should be initialized", vals.last._1, idx.lastOffset) - - for((logical, physical) <- vals) - assertEquals("Should still be able to find everything.", OffsetPosition(logical, physical), idx.lookup(logical)) - - assertWriteFails("Append should fail on read-only index", idx, 60, classOf[IllegalStateException]) - } @Test(expected = classOf[IllegalArgumentException]) def appendOutOfOrder() { @@ -115,13 +96,13 @@ class OffsetIndexTest extends JUnitSuite { } @Test - def reopenAsReadonly() { + def testReopen() { val first = OffsetPosition(51, 0) val sec = OffsetPosition(52, 1) idx.append(first.offset, first.position) idx.append(sec.offset, sec.position) idx.close() - val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset, mutable = false) + val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset) assertEquals(first, idxRo.lookup(first.offset)) assertEquals(sec, idxRo.lookup(sec.offset)) assertWriteFails("Append should fail on read-only index", idxRo, 53, classOf[IllegalStateException]) @@ -129,7 +110,8 @@ class OffsetIndexTest extends JUnitSuite { @Test def truncate() { - val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, mutable = true, maxIndexSize = 10 * 8) + val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8) + idx.truncate() for(i <- 1 until 10) idx.append(i, i) @@ -155,13 +137,6 @@ class OffsetIndexTest extends JUnitSuite { case e: Exception => assertEquals("Got an unexpected exception.", klass, e.getClass) } } - - def makeIndex(baseOffset: Long, mutable: Boolean, vals: Seq[(Long, Int)]): OffsetIndex = { - val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = baseOffset, mutable = mutable, maxIndexSize = 2 * vals.size * 8) - for ((logical, physical) <- vals) - idx.append(logical, physical) - idx - } def monotonicSeq(base: Int, len: Int): Seq[Int] = { val rand = new Random(1L) diff --git core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala index c436f3d..8b9381f 100644 --- core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala +++ core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala @@ -67,7 +67,7 @@ trait BaseMessageSetTestCases extends JUnitSuite { val channel = new RandomAccessFile(file, "rw").getChannel() val written = set.writeTo(channel, 0, 1024) assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written) - val newSet = new FileMessageSet(file, channel, false) + val newSet = new FileMessageSet(file, channel) checkEquals(set.iterator, newSet.iterator) } }