diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 88843cd..5ac719f 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -48,7 +48,8 @@ import com.yammer.metrics.core.Gauge * */ @threadsafe -class Log(val dir: File, +class Log(val dir: File, + val scheduler: Scheduler, val maxSegmentSize: Int, val maxMessageSize: Int, val flushInterval: Int = Int.MaxValue, @@ -56,6 +57,7 @@ class Log(val dir: File, val needsRecovery: Boolean, val maxIndexSize: Int = (10*1024*1024), val indexIntervalBytes: Int = 4096, + val segmentDeleteDelayMs: Long = 60000, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ @@ -90,22 +92,28 @@ class Log(val dir: File, val logSegments = new ConcurrentSkipListMap[Long, LogSegment] val ls = dir.listFiles() if(ls != null) { - for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) { - if(!file.canRead) - throw new IOException("Could not read file " + file) - val filename = file.getName() - val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong - val hasIndex = Log.indexFilename(dir, start).exists - val segment = new LogSegment(dir = dir, - startOffset = start, - indexIntervalBytes = indexIntervalBytes, - maxIndexSize = maxIndexSize) - if(!hasIndex) { - // this can only happen if someone manually deletes the index file - error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) - segment.recover(maxMessageSize) + for(file <- ls if file.isFile) { + val filename = file.getName + if(filename.endsWith(DeletedFileSuffix)) { + val deleted = file.delete() + if(!deleted) + warn("Attempt to delete defunct segment file %s failed.".format(filename)) + } else if(filename.endsWith(LogFileSuffix)) { + if(!file.canRead) + throw new IOException("Could not read file " + file) + val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong + val hasIndex = Log.indexFilename(dir, start).exists + val segment = new LogSegment(dir = dir, + startOffset = start, + indexIntervalBytes = indexIntervalBytes, + maxIndexSize = maxIndexSize) + if(!hasIndex) { + // this can only happen if someone manually deletes the index file + error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) + segment.recover(maxMessageSize) + } + logSegments.put(start, segment) } - logSegments.put(start, segment) } } @@ -332,10 +340,8 @@ class Log(val dir: File, if(segments.size == numToDelete) roll() // remove the segments for lookups - deletable.foreach(d => segments.remove(d.baseOffset)) + deletable.foreach(deleteSegment(_)) } - // do not lock around actual file deletion, it isn't O(1) on many filesystems - deletable.foreach(_.delete()) } numToDelete } @@ -425,7 +431,7 @@ class Log(val dir: File, } /** - * Delete this log from the filesystem entirely + * Completely delete this log directory and all contents from the file system with no delay */ def delete(): Unit = { logSegments.foreach(_.delete()) @@ -449,8 +455,7 @@ class Log(val dir: File, truncateFullyAndStartAt(targetOffset) } else { val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) - deletable.foreach(s => segments.remove(s.baseOffset)) - deletable.foreach(_.delete()) + deletable.foreach(deleteSegment(_)) activeSegment.truncateTo(targetOffset) this.nextOffset.set(targetOffset) } @@ -465,8 +470,7 @@ class Log(val dir: File, debug("Truncate and start log '" + name + "' to " + newOffset) lock synchronized { val segmentsToDelete = logSegments.toList - segments.clear() - segmentsToDelete.foreach(_.delete()) + segmentsToDelete.foreach(deleteSegment(_)) segments.put(newOffset, new LogSegment(dir, newOffset, @@ -493,6 +497,41 @@ class Log(val dir: File, override def toString() = "Log(" + this.dir + ")" + /** + * This method performs an asynchronous log segment delete by doing the following: + *
    + *
  1. It removes the segment from the segment map so that it will no longer be used for reads. + *
  2. It renames the index and log files by appending .deleted to the name + *
  3. It schedules an asynchronous delete operation to occur in the future + *
+ * This allows reads to happen concurrently without synchrononization and without the possibility of physically + * deleting a file while it is being read from. + * + * @param segment The log segement to schedule for deletion + */ + private def deleteSegment(segment: LogSegment) { + debug("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, dir.getName)) + lock synchronized { + segments.remove(segment.baseOffset) + val deletedLog = new File(segment.log.file.getPath + Log.DeletedFileSuffix) + val deletedIndex = new File(segment.index.file.getPath + Log.DeletedFileSuffix) + val renamedLog = segment.log.file.renameTo(deletedLog) + val renamedIndex = segment.index.file.renameTo(deletedIndex) + if(!renamedLog && segment.log.file.exists) + throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.log.file.getPath, deletedLog.getPath, name)) + if(!renamedIndex && segment.index.file.exists) + throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.index.file.getPath, deletedIndex.getPath, name)) + def asyncDeleteFiles() { + debug("Deleting log segment %s for log %s.".format(segment.baseOffset, name)) + if(!deletedLog.delete()) + warn("Failed to delete log segment file %s for log %s.".format(deletedLog.getPath, name)) + if(!deletedIndex.delete()) + warn("Failed to delete index segment file %s for log %s.".format(deletedLog.getPath, name)) + } + scheduler.schedule("delete-log-segment", asyncDeleteFiles, delay = segmentDeleteDelayMs) + } + } + } /** @@ -501,6 +540,7 @@ class Log(val dir: File, object Log { val LogFileSuffix = ".log" val IndexFileSuffix = ".index" + val DeletedFileSuffix = ".deleted" /** * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index c5e0e81..b8cc3be 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -36,9 +36,9 @@ import kafka.server.KafkaConfig * A background thread handles log retention by periodically truncating excess log segments. */ @threadsafe -private[kafka] class LogManager(val config: KafkaConfig, - scheduler: Scheduler, - private val time: Time) extends Logging { +class LogManager(val config: KafkaConfig, + scheduler: Scheduler, + private val time: Time) extends Logging { val CleanShutdownFile = ".kafka_cleanshutdown" val LockFile = ".lock" @@ -116,7 +116,8 @@ private[kafka] class LogManager(val config: KafkaConfig, val topicPartition = parseTopicPartitionName(dir.getName) val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize) - val log = new Log(dir, + val log = new Log(dir, + scheduler, maxLogFileSize, config.maxMessageSize, logFlushInterval, @@ -124,6 +125,7 @@ private[kafka] class LogManager(val config: KafkaConfig, needsRecovery, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, + config.logDeleteDelayMs, time) val previous = this.logs.put(topicPartition, log) if(previous != null) @@ -198,6 +200,7 @@ private[kafka] class LogManager(val config: KafkaConfig, val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize) log = new Log(dir, + scheduler, maxLogFileSize, config.maxMessageSize, logFlushInterval, @@ -205,6 +208,7 @@ private[kafka] class LogManager(val config: KafkaConfig, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, + config.logDeleteDelayMs, time) info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath)) logs.put(topicAndPartition, log) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1431dbc..13b2484 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -115,6 +115,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the number of messages accumulated on a log partition before messages are flushed to disk */ val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue)) + + val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue)) /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */ val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt) diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala index 75c33e0..67fb168 100644 --- a/core/src/test/scala/other/kafka/TestLogPerformance.scala +++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala @@ -18,7 +18,7 @@ package kafka.log import kafka.message._ -import kafka.utils.{SystemTime, TestUtils, Utils} +import kafka.utils.{SystemTime, TestUtils, Utils, KafkaScheduler} import kafka.server.KafkaConfig object TestLogPerformance { @@ -33,7 +33,8 @@ object TestLogPerformance { val props = TestUtils.createBrokerConfig(0, -1) val config = new KafkaConfig(props) val dir = TestUtils.tempDir() - val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime) + val scheduler = new KafkaScheduler(1) + val log = new Log(dir, scheduler, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime) val bytes = new Array[Byte](messageSize) new java.util.Random().nextBytes(bytes) val message = new Message(bytes) diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 8ba5c48..7fc154f 100644 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -85,19 +85,19 @@ class LogManagerTest extends JUnit3Suite { def testCleanupExpiredSegments() { val log = logManager.getOrCreateLog(name, 0) var offset = 0L - for(i <- 0 until 1000) { + for(i <- 0 until 200) { var set = TestUtils.singleMessageSet("test".getBytes()) val info = log.append(set) offset = info.lastOffset } - assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) - - // update the last modified time of all log segments + log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds)) - + time.sleep(maxLogAgeHours*60*60*1000 + 1) - assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments) + assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments) + time.sleep(log.segmentDeleteDelayMs + 1) + assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes) try { @@ -131,18 +131,21 @@ class LogManagerTest extends JUnit3Suite { var offset = 0L // add a bunch of messages that should be larger than the retentionSize - for(i <- 0 until 1000) { + val numMessages = 200 + for(i <- 0 until numMessages) { val set = TestUtils.singleMessageSet("test".getBytes()) val info = log.append(set) offset = info.firstOffset } // should be exactly 100 full segments + 1 new empty one - assertEquals("There should be example 100 segments.", 100, log.numberOfSegments) + assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logFileSize, log.numberOfSegments) // this cleanup shouldn't find any expired segments but should delete some to reduce size time.sleep(logManager.InitialTaskDelayMs) assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments) + time.sleep(log.segmentDeleteDelayMs + 1) + assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length) assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes) try { log.read(0, 1024) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 87a89ee..fbd738a 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -65,7 +65,7 @@ class LogTest extends JUnitSuite { val time: MockTime = new MockTime() // create a log - val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time) time.sleep(rollMs + 1) // segment age is less than its limit @@ -98,7 +98,7 @@ class LogTest extends JUnitSuite { val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -114,7 +114,7 @@ class LogTest extends JUnitSuite { @Test def testLoadEmptyLog() { createEmptyLogs(logDir, 0) - val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) log.append(TestUtils.singleMessageSet("test".getBytes)) } @@ -123,7 +123,7 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithSequentialOffsets() { - val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray for(i <- 0 until messages.length) @@ -142,7 +142,7 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithNonSequentialOffsets() { - val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) @@ -165,7 +165,7 @@ class LogTest extends JUnitSuite { */ @Test def testReadAtLogGap() { - val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -185,7 +185,7 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes) try { log.read(0, 1024) @@ -208,7 +208,7 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) val offsets = messageSets.map(log.append(_).firstOffset) @@ -232,7 +232,7 @@ class LogTest extends JUnitSuite { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -255,7 +255,7 @@ class LogTest extends JUnitSuite { for(messagesToAppend <- List(0, 1, 25)) { logDir.mkdirs() // first test a log segment starting at 0 - val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) @@ -289,7 +289,7 @@ class LogTest extends JUnitSuite { // append messages to log val maxMessageSize = second.sizeInBytes - 1 - val log = new Log(logDir, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time) // should be able to append the small message log.append(first) @@ -311,7 +311,7 @@ class LogTest extends JUnitSuite { val messageSize = 100 val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize - var log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) + var log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset) @@ -320,14 +320,14 @@ class LogTest extends JUnitSuite { log.close() // test non-recovery case - log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) + log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) log.close() // test recovery case - log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) + log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset) assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset) assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries) @@ -341,7 +341,7 @@ class LogTest extends JUnitSuite { def testIndexRebuild() { // publish the messages and close the log val numMessages = 200 - var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) + var log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) val indexFiles = log.logSegments.map(_.index.file) @@ -351,7 +351,7 @@ class LogTest extends JUnitSuite { indexFiles.foreach(_.delete()) // reopen the log - log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) + log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096) assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset) for(i <- 0 until numMessages) @@ -370,7 +370,7 @@ class LogTest extends JUnitSuite { val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages // create a log - val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -422,7 +422,7 @@ class LogTest extends JUnitSuite { val setSize = set.sizeInBytes val msgPerSeg = 10 val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages - val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) + val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) log.append(set) @@ -448,6 +448,7 @@ class LogTest extends JUnitSuite { // create a log var log = new Log(logDir, + time.scheduler, maxSegmentSize = set.sizeInBytes * 5, maxMessageSize = config.maxMessageSize, maxIndexSize = 1000, @@ -459,6 +460,7 @@ class LogTest extends JUnitSuite { log.append(set) log.close() log = new Log(logDir, + time.scheduler, maxSegmentSize = set.sizeInBytes * 5, maxMessageSize = config.maxMessageSize, maxIndexSize = 1000, @@ -469,4 +471,68 @@ class LogTest extends JUnitSuite { assertEquals("Log end offset should be 3.", 3, log.logEndOffset) } + /** + * Test that deleted files are deleted after the appropriate time. + */ + @Test + def testAsyncDelete() { + val set = TestUtils.singleMessageSet("test".getBytes()) + val asyncDeleteMs = 1000 + val log = new Log(logDir, + time.scheduler, + maxSegmentSize = set.sizeInBytes * 5, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 10000, + segmentDeleteDelayMs = asyncDeleteMs, + needsRecovery = true) + + // append some messages to create some segments + for(i <- 0 until 100) + log.append(set) + + // files should be renamed + val segments = log.logSegments.toArray + log.deleteOldSegments((s) => true) + + assertEquals("Only one segment should remain.", 1, log.numberOfSegments) + val renamed = segments.map(segment => new File(segment.log.file.getPath + Log.DeletedFileSuffix)) + assertTrue("Files should all be renamed to .deleted.", renamed.forall(_.exists)) + + // when enough time passes the files should be deleted + time.sleep(asyncDeleteMs + 1) + assertTrue("Files should all be gone.", renamed.forall(!_.exists)) + } + + /** + * Any files ending in .deleted should be removed when the log is re-opened. + */ + @Test + def testOpenDeletesObsoleteFiles() { + val set = TestUtils.singleMessageSet("test".getBytes()) + var log = new Log(logDir, + time.scheduler, + maxSegmentSize = set.sizeInBytes * 5, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 10000, + needsRecovery = false) + + // append some messages to create some segments + for(i <- 0 until 100) + log.append(set) + + log.deleteOldSegments((s) => true) + log.close() + + log = new Log(logDir, + time.scheduler, + maxSegmentSize = set.sizeInBytes * 5, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 10000, + needsRecovery = false) + assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) + } + } diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala index accb7b2..883442d 100644 --- a/core/src/test/scala/unit/kafka/message/MessageTest.scala +++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala @@ -17,8 +17,8 @@ package kafka.message -import java.util._ import java.nio._ +import java.util.HashMap import scala.collection._ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite @@ -45,7 +45,7 @@ class MessageTest extends JUnitSuite { } @Test - def testFieldValues = { + def testFieldValues { for(v <- messages) { TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload) assertEquals(Message.CurrentMagicValue, v.message.magic) @@ -69,7 +69,7 @@ class MessageTest extends JUnitSuite { } @Test - def testEquality() = { + def testEquality() { for(v <- messages) { assertFalse("Should not equal null", v.message.equals(null)) assertFalse("Should not equal a random string", v.message.equals("asdf")) @@ -80,7 +80,7 @@ class MessageTest extends JUnitSuite { } @Test - def testIsHashable() = { + def testIsHashable() { // this is silly, but why not val m = new HashMap[Message, Message]() for(v <- messages) diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala index 724ffcf..4853f2b 100644 --- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala +++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala @@ -16,7 +16,7 @@ */ package kafka.utils -import scala.collection._ +import scala.collection.mutable.PriorityQueue import java.util.concurrent.TimeUnit /** @@ -30,16 +30,13 @@ import java.util.concurrent.TimeUnit * time.sleep(1001) // this should cause our scheduled task to fire * * - * Two gotchas: - *
    - *
  1. Incrementing the time by more than one task period will result in the correct number of executions of each scheduled task - * but the order of these executions is not specified. - *
  2. Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time) - *
+ * Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time). */ +@nonthreadsafe class MockScheduler(val time: Time) extends Scheduler { - var tasks = mutable.ArrayBuffer[MockScheduled]() + /* a priority queue of tasks ordered by next execution time */ + var tasks = new PriorityQueue[MockTask]() def startup() {} @@ -50,34 +47,38 @@ class MockScheduler(val time: Time) extends Scheduler { /** * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs * when this method is called and the execution happens synchronously in the calling thread. - * If you are using the scheduler associated with a MockTime instance this call will happen automatically. + * If you are using the scheduler associated with a MockTime instance this call be triggered automatically. */ def tick() { - var tasks = mutable.ArrayBuffer[MockScheduled]() val now = time.milliseconds - for(task <- this.tasks) { - if(task.nextExecution <= now) { - if(task.period >= 0) { - val executions = (now - task.nextExecution) / task.period - for(i <- 0 to executions.toInt) - task.fun() - task.nextExecution += (executions + 1) * task.period - tasks += task - } else { - task.fun() - } - } else { - tasks += task + while(!tasks.isEmpty && tasks.head.nextExecution <= now) { + /* pop and execute the task with the lowest next execution time */ + val curr = tasks.head + this.tasks = tasks.tail + curr.fun() + /* if the task is periodic, reschedule it and re-enqueue */ + if(curr.periodic) { + curr.nextExecution += curr.period + this.tasks += curr } } - this.tasks = tasks } def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) { - tasks += MockScheduled(name, fun, time.milliseconds + delay, period = period) + tasks += MockTask(name, fun, time.milliseconds + delay, period = period) tick() } } -case class MockScheduled(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) \ No newline at end of file +case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) extends Ordered[MockTask] { + def periodic = period >= 0 + def compare(t: MockTask): Int = { + if(t.nextExecution == nextExecution) + return 0 + else if (t.nextExecution < nextExecution) + return -1 + else + return 1 + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala index ae16a71..ec27ef9 100644 --- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala +++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala @@ -67,6 +67,13 @@ class SchedulerTest { } @Test + def testReentrantTaskInMockScheduler() { + mockTime.scheduler.schedule("test1", () => mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=0), delay=1) + mockTime.sleep(1) + assertEquals(1, counter2.get) + } + + @Test def testNonPeriodicTask() { scheduler.schedule("test", counter1.getAndIncrement, delay = 0) retry(30000, () => assertEquals(counter1.get, 1))