Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision 1297324) +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala (working copy) @@ -39,6 +39,7 @@ config = new KafkaConfig(props) { override val logFileSize = 1024 override val enableZookeeper = false + override val flushInterval = 100 } logManager = new LogManager(config, null, time, -1, maxLogAge, false) logManager.startup @@ -78,10 +79,13 @@ offset += set.sizeInBytes } log.flush - // Why this sleep is required ? File system takes some time to update the last modified time for a file. - // TODO: What is unknown is why 1 second or couple 100 milliseconds didn't work ? - Thread.sleep(2000) + assertTrue("There should be more than one segment now.", log.numberOfSegments > 1) + + // update the last modified time of all log segments + val logSegments = log.segments.view + logSegments.foreach(s => s.file.setLastModified(time.currentMs)) + time.currentMs += maxLogAge + 3000 logManager.cleanupLogs() assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments) @@ -109,6 +113,7 @@ override val enableZookeeper = false override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over override val logRetentionHours = retentionHours + override val flushInterval = 100 } logManager = new LogManager(config, null, time, -1, retentionMs, false) logManager.startup @@ -177,6 +182,7 @@ override val logFileSize = 256 override val enableZookeeper = false override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2") + override val flushInterval = 100 } logManager = new LogManager(config, null, time, -1, maxLogAge, false)