Index: core/src/main/scala/kafka/log/LogManager.scala =================================================================== --- core/src/main/scala/kafka/log/LogManager.scala (revision 6af334d69ade5f4f21a89b8b2a8e0e881ed51359) +++ core/src/main/scala/kafka/log/LogManager.scala (revision ) @@ -51,6 +51,7 @@ private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false) private val logFlushIntervalMap = config.flushIntervalMap private val logRetentionMSMap = getLogRetentionMSMap(config.logRetentionHoursMap) + private val logRetentionSize = config.logRetentionSize /* Initialize a log for each subdirectory of the main log directory */ private val logs = new Pool[String, Pool[Int, Log]]() @@ -193,7 +194,52 @@ log } + /* Attemps to delete all provided segments from a log and returns how many it was able to */ + private def deleteSegments(log: Log, segments: Seq[LogSegment]): Int = { + var total = 0 + for(segment <- segments) { + logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name) + Utils.swallow(logger.warn, segment.messageSet.close()) + if(!segment.file.delete()) { + logger.warn("Delete failed.") + } else { + total += 1 + } + } + total + } + + /* Runs through the log removing segments older than a certain age */ + private def cleanupExpiredSegments(log: Log): Int = { + val startMs = time.milliseconds + val topic = Utils.getTopicPartition(log.dir.getName)._1 + val logCleanupThresholdMS = logRetentionMSMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs) + val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS) + val total = deleteSegments(log, toBeDeleted) + total + } + /** + * Runs through the log removing segments until the size of the log + * is at least logRetentionSize bytes in size + */ + private def cleanupSegmentsToMaintainSize(log: Log): Int = { + if(logRetentionSize < 0 || log.size < logRetentionSize) return 0 + var diff = log.size - logRetentionSize + def shouldDelete(segment: LogSegment) = { + if(diff - segment.size >= 0) { + diff -= segment.size + true + } else { + false + } + } + val toBeDeleted = log.markDeletedWhile( shouldDelete ) + val total = deleteSegments(log, toBeDeleted) + total + } + + /** * Delete any eligible logs. Return the number of segments deleted. */ def cleanupLogs() { @@ -204,20 +250,8 @@ while(iter.hasNext) { val log = iter.next logger.debug("Garbage collecting '" + log.name + "'") - var logCleanupThresholdMS = this.logCleanupDefaultAgeMs - val topic = Utils.getTopicPartition(log.dir.getName)._1 - if (logRetentionMSMap.contains(topic)) - logCleanupThresholdMS = logRetentionMSMap(topic) - val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMS) - for(segment <- toBeDeleted) { - logger.info("Deleting log segment " + segment.file.getName() + " from " + log.name) - Utils.swallow(logger.warn, segment.messageSet.close()) - if(!segment.file.delete()) - logger.warn("Delete failed.") - else - total += 1 + total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) - } + } - } logger.debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") } Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 6af334d69ade5f4f21a89b8b2a8e0e881ed51359) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (revision ) @@ -64,6 +64,9 @@ /* the number of hours to keep a log file before deleting it */ val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue)) + /* the maximum size of the log before deleting it */ + val logRetentionSize = Utils.getInt(props, "log.retention.size", -1) + /* the number of hours to keep a log file before deleting it for some specific topic*/ val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", "")) Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision 6af334d69ade5f4f21a89b8b2a8e0e881ed51359) +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision ) @@ -59,7 +59,7 @@ @Test - def testCleanup() { + def testCleanupExpiredSegments() { val log = logManager.getOrCreateLog("cleanup", 0) var offset = 0L for(i <- 0 until 1000) { @@ -87,6 +87,54 @@ } @Test + def testCleanupSegmentsToMaintainSize() { + val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes + val retentionHours = 1 + val retentionMs = 1000 * 60 * 60 * retentionHours + val props = TestUtils.createBrokerConfig(0, -1) + logManager.close + Thread.sleep(100) + config = new KafkaConfig(props) { + override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages + override val enableZookeeper = false + override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over + override val logRetentionHours = retentionHours + } + logManager = new LogManager(config, null, time, -1, retentionMs, false) + logManager.startup + + // create a log + val log = logManager.getOrCreateLog("cleanup", 0) + var offset = 0L + + // add a bunch of messages that should be larger than the retentionSize + for(i <- 0 until 1000) { + val set = TestUtils.singleMessageSet("test".getBytes()) + log.append(set) + offset += set.sizeInBytes + } + // flush to make sure it's written to disk, then sleep to confirm + log.flush + Thread.sleep(2000) + + // should be exactly 100 full segments + 1 new empty one + assertEquals("There should be example 101 segments.", 100 + 1, log.numberOfSegments) + + // this cleanup shouldn't find any expired segments but should delete some to reduce size + logManager.cleanupLogs() + assertEquals("Now there should be exactly 7 segments", 6 + 1, log.numberOfSegments) + assertEquals("Should get empty fetch off new log.", 0L, log.read(offset, 1024).sizeInBytes) + try { + log.read(0, 1024) + fail("Should get exception from fetching earlier.") + } catch { + case e: OffsetOutOfRangeException => "This is good." + } + // log should still be appendable + log.append(TestUtils.singleMessageSet("test".getBytes())) + } + + @Test def testTimeBasedFlush() { val props = TestUtils.createBrokerConfig(0, -1) logManager.close