diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 88843cd..5ac719f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -48,7 +48,8 @@ import com.yammer.metrics.core.Gauge
*
*/
@threadsafe
-class Log(val dir: File,
+class Log(val dir: File,
+ val scheduler: Scheduler,
val maxSegmentSize: Int,
val maxMessageSize: Int,
val flushInterval: Int = Int.MaxValue,
@@ -56,6 +57,7 @@ class Log(val dir: File,
val needsRecovery: Boolean,
val maxIndexSize: Int = (10*1024*1024),
val indexIntervalBytes: Int = 4096,
+ val segmentDeleteDelayMs: Long = 60000,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
import kafka.log.Log._
@@ -90,22 +92,28 @@ class Log(val dir: File,
val logSegments = new ConcurrentSkipListMap[Long, LogSegment]
val ls = dir.listFiles()
if(ls != null) {
- for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) {
- if(!file.canRead)
- throw new IOException("Could not read file " + file)
- val filename = file.getName()
- val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
- val hasIndex = Log.indexFilename(dir, start).exists
- val segment = new LogSegment(dir = dir,
- startOffset = start,
- indexIntervalBytes = indexIntervalBytes,
- maxIndexSize = maxIndexSize)
- if(!hasIndex) {
- // this can only happen if someone manually deletes the index file
- error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
- segment.recover(maxMessageSize)
+ for(file <- ls if file.isFile) {
+ val filename = file.getName
+ if(filename.endsWith(DeletedFileSuffix)) {
+ val deleted = file.delete()
+ if(!deleted)
+ warn("Attempt to delete defunct segment file %s failed.".format(filename))
+ } else if(filename.endsWith(LogFileSuffix)) {
+ if(!file.canRead)
+ throw new IOException("Could not read file " + file)
+ val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
+ val hasIndex = Log.indexFilename(dir, start).exists
+ val segment = new LogSegment(dir = dir,
+ startOffset = start,
+ indexIntervalBytes = indexIntervalBytes,
+ maxIndexSize = maxIndexSize)
+ if(!hasIndex) {
+ // this can only happen if someone manually deletes the index file
+ error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
+ segment.recover(maxMessageSize)
+ }
+ logSegments.put(start, segment)
}
- logSegments.put(start, segment)
}
}
@@ -332,10 +340,8 @@ class Log(val dir: File,
if(segments.size == numToDelete)
roll()
// remove the segments for lookups
- deletable.foreach(d => segments.remove(d.baseOffset))
+ deletable.foreach(deleteSegment(_))
}
- // do not lock around actual file deletion, it isn't O(1) on many filesystems
- deletable.foreach(_.delete())
}
numToDelete
}
@@ -425,7 +431,7 @@ class Log(val dir: File,
}
/**
- * Delete this log from the filesystem entirely
+ * Completely delete this log directory and all contents from the file system with no delay
*/
def delete(): Unit = {
logSegments.foreach(_.delete())
@@ -449,8 +455,7 @@ class Log(val dir: File,
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
- deletable.foreach(s => segments.remove(s.baseOffset))
- deletable.foreach(_.delete())
+ deletable.foreach(deleteSegment(_))
activeSegment.truncateTo(targetOffset)
this.nextOffset.set(targetOffset)
}
@@ -465,8 +470,7 @@ class Log(val dir: File,
debug("Truncate and start log '" + name + "' to " + newOffset)
lock synchronized {
val segmentsToDelete = logSegments.toList
- segments.clear()
- segmentsToDelete.foreach(_.delete())
+ segmentsToDelete.foreach(deleteSegment(_))
segments.put(newOffset,
new LogSegment(dir,
newOffset,
@@ -493,6 +497,41 @@ class Log(val dir: File,
override def toString() = "Log(" + this.dir + ")"
+ /**
+ * This method performs an asynchronous log segment delete by doing the following:
+ *
+ * - It removes the segment from the segment map so that it will no longer be used for reads.
+ *
- It renames the index and log files by appending .deleted to the name
+ *
- It schedules an asynchronous delete operation to occur in the future
+ *
+ * This allows reads to happen concurrently without synchrononization and without the possibility of physically
+ * deleting a file while it is being read from.
+ *
+ * @param segment The log segement to schedule for deletion
+ */
+ private def deleteSegment(segment: LogSegment) {
+ debug("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, dir.getName))
+ lock synchronized {
+ segments.remove(segment.baseOffset)
+ val deletedLog = new File(segment.log.file.getPath + Log.DeletedFileSuffix)
+ val deletedIndex = new File(segment.index.file.getPath + Log.DeletedFileSuffix)
+ val renamedLog = segment.log.file.renameTo(deletedLog)
+ val renamedIndex = segment.index.file.renameTo(deletedIndex)
+ if(!renamedLog && segment.log.file.exists)
+ throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.log.file.getPath, deletedLog.getPath, name))
+ if(!renamedIndex && segment.index.file.exists)
+ throw new KafkaStorageException("Failed to rename file %s to %s for log %s.".format(segment.index.file.getPath, deletedIndex.getPath, name))
+ def asyncDeleteFiles() {
+ debug("Deleting log segment %s for log %s.".format(segment.baseOffset, name))
+ if(!deletedLog.delete())
+ warn("Failed to delete log segment file %s for log %s.".format(deletedLog.getPath, name))
+ if(!deletedIndex.delete())
+ warn("Failed to delete index segment file %s for log %s.".format(deletedLog.getPath, name))
+ }
+ scheduler.schedule("delete-log-segment", asyncDeleteFiles, delay = segmentDeleteDelayMs)
+ }
+ }
+
}
/**
@@ -501,6 +540,7 @@ class Log(val dir: File,
object Log {
val LogFileSuffix = ".log"
val IndexFileSuffix = ".index"
+ val DeletedFileSuffix = ".deleted"
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c5e0e81..b8cc3be 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -36,9 +36,9 @@ import kafka.server.KafkaConfig
* A background thread handles log retention by periodically truncating excess log segments.
*/
@threadsafe
-private[kafka] class LogManager(val config: KafkaConfig,
- scheduler: Scheduler,
- private val time: Time) extends Logging {
+class LogManager(val config: KafkaConfig,
+ scheduler: Scheduler,
+ private val time: Time) extends Logging {
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
@@ -116,7 +116,8 @@ private[kafka] class LogManager(val config: KafkaConfig,
val topicPartition = parseTopicPartitionName(dir.getName)
val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
- val log = new Log(dir,
+ val log = new Log(dir,
+ scheduler,
maxLogFileSize,
config.maxMessageSize,
logFlushInterval,
@@ -124,6 +125,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
needsRecovery,
config.logIndexMaxSizeBytes,
config.logIndexIntervalBytes,
+ config.logDeleteDelayMs,
time)
val previous = this.logs.put(topicPartition, log)
if(previous != null)
@@ -198,6 +200,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
log = new Log(dir,
+ scheduler,
maxLogFileSize,
config.maxMessageSize,
logFlushInterval,
@@ -205,6 +208,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
needsRecovery = false,
config.logIndexMaxSizeBytes,
config.logIndexIntervalBytes,
+ config.logDeleteDelayMs,
time)
info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
logs.put(topicAndPartition, log)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1431dbc..13b2484 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -115,6 +115,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the number of messages accumulated on a log partition before messages are flushed to disk */
val flushInterval = props.getIntInRange("log.flush.interval", 500, (1, Int.MaxValue))
+
+ val logDeleteDelayMs = props.getLongInRange("log.segment.delete.delay.ms", 60000, (0, Long.MaxValue))
/* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000 */
val flushIntervalMap = props.getMap("topic.flush.intervals.ms", _.toInt > 0).mapValues(_.toInt)
diff --git a/core/src/test/scala/other/kafka/TestLogPerformance.scala b/core/src/test/scala/other/kafka/TestLogPerformance.scala
index 75c33e0..67fb168 100644
--- a/core/src/test/scala/other/kafka/TestLogPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestLogPerformance.scala
@@ -18,7 +18,7 @@
package kafka.log
import kafka.message._
-import kafka.utils.{SystemTime, TestUtils, Utils}
+import kafka.utils.{SystemTime, TestUtils, Utils, KafkaScheduler}
import kafka.server.KafkaConfig
object TestLogPerformance {
@@ -33,7 +33,8 @@ object TestLogPerformance {
val props = TestUtils.createBrokerConfig(0, -1)
val config = new KafkaConfig(props)
val dir = TestUtils.tempDir()
- val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, time = SystemTime)
+ val scheduler = new KafkaScheduler(1)
+ val log = new Log(dir, scheduler, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, needsRecovery = false, segmentDeleteDelayMs = 0, time = SystemTime)
val bytes = new Array[Byte](messageSize)
new java.util.Random().nextBytes(bytes)
val message = new Message(bytes)
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 8ba5c48..7fc154f 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -85,19 +85,19 @@ class LogManagerTest extends JUnit3Suite {
def testCleanupExpiredSegments() {
val log = logManager.getOrCreateLog(name, 0)
var offset = 0L
- for(i <- 0 until 1000) {
+ for(i <- 0 until 200) {
var set = TestUtils.singleMessageSet("test".getBytes())
val info = log.append(set)
offset = info.lastOffset
}
-
assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
-
- // update the last modified time of all log segments
+
log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
-
+
time.sleep(maxLogAgeHours*60*60*1000 + 1)
- assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
+ assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
+ time.sleep(log.segmentDeleteDelayMs + 1)
+ assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
try {
@@ -131,18 +131,21 @@ class LogManagerTest extends JUnit3Suite {
var offset = 0L
// add a bunch of messages that should be larger than the retentionSize
- for(i <- 0 until 1000) {
+ val numMessages = 200
+ for(i <- 0 until numMessages) {
val set = TestUtils.singleMessageSet("test".getBytes())
val info = log.append(set)
offset = info.firstOffset
}
// should be exactly 100 full segments + 1 new empty one
- assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
+ assertEquals("Check we have the expected number of segments.", numMessages * setSize / config.logFileSize, log.numberOfSegments)
// this cleanup shouldn't find any expired segments but should delete some to reduce size
time.sleep(logManager.InitialTaskDelayMs)
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
+ time.sleep(log.segmentDeleteDelayMs + 1)
+ assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
try {
log.read(0, 1024)
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 87a89ee..fbd738a 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -65,7 +65,7 @@ class LogTest extends JUnitSuite {
val time: MockTime = new MockTime()
// create a log
- val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
time.sleep(rollMs + 1)
// segment age is less than its limit
@@ -98,7 +98,7 @@ class LogTest extends JUnitSuite {
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
- val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
// segments expire in size
@@ -114,7 +114,7 @@ class LogTest extends JUnitSuite {
@Test
def testLoadEmptyLog() {
createEmptyLogs(logDir, 0)
- val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
log.append(TestUtils.singleMessageSet("test".getBytes))
}
@@ -123,7 +123,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testAppendAndReadWithSequentialOffsets() {
- val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray
for(i <- 0 until messages.length)
@@ -142,7 +142,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testAppendAndReadWithNonSequentialOffsets() {
- val log = new Log(logDir, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 71, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray
val messages = messageIds.map(id => new Message(id.toString.getBytes))
@@ -165,7 +165,7 @@ class LogTest extends JUnitSuite {
*/
@Test
def testReadAtLogGap() {
- val log = new Log(logDir, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 300, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
// keep appending until we have two segments with only a single message in the second segment
while(log.numberOfSegments == 1)
@@ -185,7 +185,7 @@ class LogTest extends JUnitSuite {
@Test
def testReadOutOfRange() {
createEmptyLogs(logDir, 1024)
- val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
try {
log.read(0, 1024)
@@ -208,7 +208,7 @@ class LogTest extends JUnitSuite {
@Test
def testLogRolls() {
/* create a multipart log with 100 messages */
- val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
val numMessages = 100
val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes))
val offsets = messageSets.map(log.append(_).firstOffset)
@@ -232,7 +232,7 @@ class LogTest extends JUnitSuite {
@Test
def testCompressedMessages() {
/* this log should roll after every messageset */
- val log = new Log(logDir, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 10, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
/* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */
log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
@@ -255,7 +255,7 @@ class LogTest extends JUnitSuite {
for(messagesToAppend <- List(0, 1, 25)) {
logDir.mkdirs()
// first test a log segment starting at 0
- val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
for(i <- 0 until messagesToAppend)
log.append(TestUtils.singleMessageSet(i.toString.getBytes))
@@ -289,7 +289,7 @@ class LogTest extends JUnitSuite {
// append messages to log
val maxMessageSize = second.sizeInBytes - 1
- val log = new Log(logDir, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, 100, maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, time = time)
// should be able to append the small message
log.append(first)
@@ -311,7 +311,7 @@ class LogTest extends JUnitSuite {
val messageSize = 100
val segmentSize = 7 * messageSize
val indexInterval = 3 * messageSize
- var log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+ var log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
@@ -320,14 +320,14 @@ class LogTest extends JUnitSuite {
log.close()
// test non-recovery case
- log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+ log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = false, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
log.close()
// test recovery case
- log = new Log(logDir, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
+ log = new Log(logDir, time.scheduler, segmentSize, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = indexInterval, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
@@ -341,7 +341,7 @@ class LogTest extends JUnitSuite {
def testIndexRebuild() {
// publish the messages and close the log
val numMessages = 200
- var log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+ var log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
val indexFiles = log.logSegments.map(_.index.file)
@@ -351,7 +351,7 @@ class LogTest extends JUnitSuite {
indexFiles.foreach(_.delete())
// reopen the log
- log = new Log(logDir, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
+ log = new Log(logDir, time.scheduler, 200, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, needsRecovery = true, indexIntervalBytes = 1, maxIndexSize = 4096)
assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
for(i <- 0 until numMessages)
@@ -370,7 +370,7 @@ class LogTest extends JUnitSuite {
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
// create a log
- val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
@@ -422,7 +422,7 @@ class LogTest extends JUnitSuite {
val setSize = set.sizeInBytes
val msgPerSeg = 10
val logFileSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages
- val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
+ val log = new Log(logDir, time.scheduler, logFileSize, config.maxMessageSize, 1000, 10000, needsRecovery = false, time = time)
assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
for (i<- 1 to msgPerSeg)
log.append(set)
@@ -448,6 +448,7 @@ class LogTest extends JUnitSuite {
// create a log
var log = new Log(logDir,
+ time.scheduler,
maxSegmentSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxIndexSize = 1000,
@@ -459,6 +460,7 @@ class LogTest extends JUnitSuite {
log.append(set)
log.close()
log = new Log(logDir,
+ time.scheduler,
maxSegmentSize = set.sizeInBytes * 5,
maxMessageSize = config.maxMessageSize,
maxIndexSize = 1000,
@@ -469,4 +471,68 @@ class LogTest extends JUnitSuite {
assertEquals("Log end offset should be 3.", 3, log.logEndOffset)
}
+ /**
+ * Test that deleted files are deleted after the appropriate time.
+ */
+ @Test
+ def testAsyncDelete() {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ val asyncDeleteMs = 1000
+ val log = new Log(logDir,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
+ maxMessageSize = config.maxMessageSize,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
+ segmentDeleteDelayMs = asyncDeleteMs,
+ needsRecovery = true)
+
+ // append some messages to create some segments
+ for(i <- 0 until 100)
+ log.append(set)
+
+ // files should be renamed
+ val segments = log.logSegments.toArray
+ log.deleteOldSegments((s) => true)
+
+ assertEquals("Only one segment should remain.", 1, log.numberOfSegments)
+ val renamed = segments.map(segment => new File(segment.log.file.getPath + Log.DeletedFileSuffix))
+ assertTrue("Files should all be renamed to .deleted.", renamed.forall(_.exists))
+
+ // when enough time passes the files should be deleted
+ time.sleep(asyncDeleteMs + 1)
+ assertTrue("Files should all be gone.", renamed.forall(!_.exists))
+ }
+
+ /**
+ * Any files ending in .deleted should be removed when the log is re-opened.
+ */
+ @Test
+ def testOpenDeletesObsoleteFiles() {
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ var log = new Log(logDir,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
+ maxMessageSize = config.maxMessageSize,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
+ needsRecovery = false)
+
+ // append some messages to create some segments
+ for(i <- 0 until 100)
+ log.append(set)
+
+ log.deleteOldSegments((s) => true)
+ log.close()
+
+ log = new Log(logDir,
+ time.scheduler,
+ maxSegmentSize = set.sizeInBytes * 5,
+ maxMessageSize = config.maxMessageSize,
+ maxIndexSize = 1000,
+ indexIntervalBytes = 10000,
+ needsRecovery = false)
+ assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index accb7b2..883442d 100644
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -17,8 +17,8 @@
package kafka.message
-import java.util._
import java.nio._
+import java.util.HashMap
import scala.collection._
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
@@ -45,7 +45,7 @@ class MessageTest extends JUnitSuite {
}
@Test
- def testFieldValues = {
+ def testFieldValues {
for(v <- messages) {
TestUtils.checkEquals(ByteBuffer.wrap(v.payload), v.message.payload)
assertEquals(Message.CurrentMagicValue, v.message.magic)
@@ -69,7 +69,7 @@ class MessageTest extends JUnitSuite {
}
@Test
- def testEquality() = {
+ def testEquality() {
for(v <- messages) {
assertFalse("Should not equal null", v.message.equals(null))
assertFalse("Should not equal a random string", v.message.equals("asdf"))
@@ -80,7 +80,7 @@ class MessageTest extends JUnitSuite {
}
@Test
- def testIsHashable() = {
+ def testIsHashable() {
// this is silly, but why not
val m = new HashMap[Message, Message]()
for(v <- messages)
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 724ffcf..4853f2b 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -16,7 +16,7 @@
*/
package kafka.utils
-import scala.collection._
+import scala.collection.mutable.PriorityQueue
import java.util.concurrent.TimeUnit
/**
@@ -30,16 +30,13 @@ import java.util.concurrent.TimeUnit
* time.sleep(1001) // this should cause our scheduled task to fire
*
*
- * Two gotchas:
- *
- * - Incrementing the time by more than one task period will result in the correct number of executions of each scheduled task
- * but the order of these executions is not specified.
- *
- Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time)
- *
+ * Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time).
*/
+@nonthreadsafe
class MockScheduler(val time: Time) extends Scheduler {
- var tasks = mutable.ArrayBuffer[MockScheduled]()
+ /* a priority queue of tasks ordered by next execution time */
+ var tasks = new PriorityQueue[MockTask]()
def startup() {}
@@ -50,34 +47,38 @@ class MockScheduler(val time: Time) extends Scheduler {
/**
* Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
* when this method is called and the execution happens synchronously in the calling thread.
- * If you are using the scheduler associated with a MockTime instance this call will happen automatically.
+ * If you are using the scheduler associated with a MockTime instance this call be triggered automatically.
*/
def tick() {
- var tasks = mutable.ArrayBuffer[MockScheduled]()
val now = time.milliseconds
- for(task <- this.tasks) {
- if(task.nextExecution <= now) {
- if(task.period >= 0) {
- val executions = (now - task.nextExecution) / task.period
- for(i <- 0 to executions.toInt)
- task.fun()
- task.nextExecution += (executions + 1) * task.period
- tasks += task
- } else {
- task.fun()
- }
- } else {
- tasks += task
+ while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+ /* pop and execute the task with the lowest next execution time */
+ val curr = tasks.head
+ this.tasks = tasks.tail
+ curr.fun()
+ /* if the task is periodic, reschedule it and re-enqueue */
+ if(curr.periodic) {
+ curr.nextExecution += curr.period
+ this.tasks += curr
}
}
- this.tasks = tasks
}
def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) {
- tasks += MockScheduled(name, fun, time.milliseconds + delay, period = period)
+ tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
tick()
}
}
-case class MockScheduled(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long)
\ No newline at end of file
+case class MockTask(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long) extends Ordered[MockTask] {
+ def periodic = period >= 0
+ def compare(t: MockTask): Int = {
+ if(t.nextExecution == nextExecution)
+ return 0
+ else if (t.nextExecution < nextExecution)
+ return -1
+ else
+ return 1
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
index ae16a71..ec27ef9 100644
--- a/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
@@ -67,6 +67,13 @@ class SchedulerTest {
}
@Test
+ def testReentrantTaskInMockScheduler() {
+ mockTime.scheduler.schedule("test1", () => mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=0), delay=1)
+ mockTime.sleep(1)
+ assertEquals(1, counter2.get)
+ }
+
+ @Test
def testNonPeriodicTask() {
scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
retry(30000, () => assertEquals(counter1.get, 1))