From 71086b28d99d54a8002c608a1434cd6ba245c880 Mon Sep 17 00:00:00 2001 From: Nick Howard Date: Mon, 14 Apr 2014 12:00:38 -0600 Subject: [PATCH] Ensure unflushed messages are not deleted prematurely by expire based segment cleanup --- core/src/main/scala/kafka/log/Log.scala | 13 +++++++------ core/src/test/scala/unit/kafka/log/LogTest.scala | 23 +++++++++++++++++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 46df8d9..3d3e998 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -403,19 +403,20 @@ class Log(val dir: File, def deleteOldSegments(predicate: LogSegment => Boolean): Int = { // find any segments that match the user-supplied predicate UNLESS it is the final segment // and it is empty (since we would just end up re-creating it - val lastSegment = activeSegment - var deletable = logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastSegment.baseOffset || s.size > 0)) - val numToDelete = deletable.size - if(numToDelete > 0) { - lock synchronized { + lock synchronized { + val lastSegment = activeSegment + var deletable = logSegments.takeWhile(s => predicate(s) && + (s.baseOffset != lastSegment.baseOffset || (s.size > 0 && unflushedMessages == 0))) + val numToDelete = deletable.size + if(numToDelete > 0) { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first if(segments.size == numToDelete) roll() // remove the segments for lookups deletable.foreach(deleteSegment(_)) } + numToDelete } - numToDelete } /** diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1da1393..039ef54 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -89,6 +89,28 @@ class LogTest extends JUnitSuite { assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments) } + @Test + def testLastSegmentNotDeletedWhenUnflushedMessages() { + val set = TestUtils.singleMessageSet("test".getBytes()) + val log = new Log(logDir, logConfig.copy(segmentMs = 1 * 60 * 60L), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + + log.append(set) + assertEquals("Log has one unflushed message", 1, log.unflushedMessages) + log.flush() + + assertEquals("Log has zero unflushed message", 0, log.unflushedMessages) + log.append(set) + assertEquals("Log has one unflushed message", 1, log.unflushedMessages) + + val ct = log.deleteOldSegments(_ => true) + + assertEquals("log segments", 1, log.numberOfSegments) + assertEquals("log size", 60, log.size) + + val read = log.read(0, 100, Some(3)) + assertEquals("expect messages readable", 2, read.size) + } + /** * Test that appending more than the maximum segment size rolls the log */ @@ -526,6 +548,7 @@ class LogTest extends JUnitSuite { // append some messages to create some segments for(i <- 0 until 100) log.append(set) + log.flush() // files should be renamed val segments = log.logSegments.toArray -- 1.8.5.2 (Apple Git-48)