From 42af8e28a5f915da03e7075701fd1b66656c11b8 Mon Sep 17 00:00:00 2001 From: jholoman Date: Tue, 10 Mar 2015 00:53:56 -0400 Subject: [PATCH] KAFKA-1990 --- core/src/main/scala/kafka/log/LogConfig.scala | 11 ++--- core/src/main/scala/kafka/log/LogManager.scala | 9 +++- core/src/main/scala/kafka/server/KafkaConfig.scala | 24 +++++---- core/src/test/scala/kafka/log/LogConfigTest.scala | 2 + .../test/scala/unit/kafka/admin/AdminTest.scala | 2 +- .../scala/unit/kafka/server/KafkaConfigTest.scala | 57 ++++++++++++++++++++-- 6 files changed, 83 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 8b67aee..8984820 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,8 +21,6 @@ import java.util.Properties import org.apache.kafka.common.utils.Utils import scala.collection._ import org.apache.kafka.common.config.ConfigDef -import kafka.common._ -import scala.collection.JavaConversions._ import kafka.message.BrokerCompressionCodec object Defaults { @@ -93,7 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(FlushMessagesProp, flushInterval.toString) props.put(FlushMsProp, flushMs.toString) props.put(RetentionBytesProp, retentionSize.toString) - props.put(RententionMsProp, retentionMs.toString) + props.put(RetentionMsProp, retentionMs.toString) props.put(MaxMessageBytesProp, maxMessageSize.toString) props.put(IndexIntervalBytesProp, indexInterval.toString) props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) @@ -122,7 +120,7 @@ object LogConfig { val FlushMessagesProp = "flush.messages" val FlushMsProp = "flush.ms" val RetentionBytesProp = "retention.bytes" - val RententionMsProp = "retention.ms" + val RetentionMsProp = "retention.ms" val MaxMessageBytesProp = "max.message.bytes" val IndexIntervalBytesProp = "index.interval.bytes" val DeleteRetentionMsProp = "delete.retention.ms" @@ -172,7 +170,8 @@ object LogConfig { .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc) // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize .define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc) - .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc) + // can be negative. See kafka.log.LogManager.cleanupExpiredSegments + .define(RetentionMsProp, LONG, Defaults.RetentionMs, MEDIUM, RetentionMsDoc) .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc) .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM, IndexIntervalDoc) .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM, DeleteRetentionMsDoc) @@ -206,7 +205,7 @@ object LogConfig { flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long], flushMs = parsed.get(FlushMsProp).asInstanceOf[Long], retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long], - retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long], + retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long], maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int], indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int], fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long], diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 47d250a..aaec63a 100644 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -404,9 +404,13 @@ class LogManager(val logDirs: Array[File], } /** - * Runs through the log removing segments older than a certain age + * Runs through the log removing segments older than a certain age, unless retention + * period is < 0, i.e. unlimited */ private def cleanupExpiredSegments(log: Log): Int = { + if (log.config.retentionMs < 0) { + return 0 + } val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) } @@ -416,8 +420,9 @@ class LogManager(val logDirs: Array[File], * is at least logRetentionSize bytes in size */ private def cleanupSegmentsToMaintainSize(log: Log): Int = { - if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) + if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) { return 0 + } var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 48e3362..f4f323f 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -394,7 +394,7 @@ object KafkaConfig { .define(LogRetentionTimeMillisProp, LONG, HIGH, LogRetentionTimeMillisDoc, false) .define(LogRetentionTimeMinutesProp, INT, HIGH, LogRetentionTimeMinsDoc, false) - .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, atLeast(1), HIGH, LogRetentionTimeHoursDoc) + .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, HIGH, LogRetentionTimeHoursDoc) .define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc) .define(LogCleanupIntervalMsProp, LONG, Defaults.LogCleanupIntervalMs, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc) @@ -745,12 +745,17 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - _logRetentionTimeMillis.getOrElse( - _logRetentionTimeMins match { - case Some(mins) => millisInMinute * mins - case None => millisInHour * logRetentionTimeHours - } - ) + val millis = { + _logRetentionTimeMillis.getOrElse( + if (_logRetentionTimeMins.isDefined) { + millisInMinute * _logRetentionTimeMins.get + } else { + millisInHour * logRetentionTimeHours + } + ) + } + if (millis < 0) return -1 + millis } validateValues() @@ -759,8 +764,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0") - require(logRetentionTimeMillis >= 1, "log.retention.ms must be equal or greater than 1") - require(_logRetentionTimeMins.forall(_ >= 1), "log.retention.minutes must be equal or greater than 1") + require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1") + require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1") + require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1") require(logDirs.size > 0) require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala index 9690f14..4c43fa2 100644 --- a/core/src/test/scala/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/kafka/log/LogConfigTest.scala @@ -56,6 +56,7 @@ class LogConfigTest extends JUnit3Suite { case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString) case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString) + case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString) case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) } }) @@ -70,6 +71,7 @@ class LogConfigTest extends JUnit3Suite { name match { case LogConfig.UncleanLeaderElectionEnableProp => return case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") + case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index ee0b21e..25ac638 100644 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -370,7 +370,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) - props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) + props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString) props } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 7f47e6f..b79b987 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -22,8 +22,6 @@ import org.junit.Test import junit.framework.Assert._ import org.scalatest.junit.JUnit3Suite import kafka.utils.TestUtils -import kafka.message.GZIPCompressionCodec -import kafka.message.NoCompressionCodec class KafkaConfigTest extends JUnit3Suite { @@ -46,7 +44,6 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis) } - @Test def testLogRetentionTimeMsProvided() { val props = TestUtils.createBrokerConfig(0, 8181) @@ -56,7 +53,6 @@ class KafkaConfigTest extends JUnit3Suite { assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis) } - @Test def testLogRetentionTimeNoConfigProvided() { val props = TestUtils.createBrokerConfig(0, 8181) @@ -89,6 +85,59 @@ class KafkaConfigTest extends JUnit3Suite { } @Test + def testLogRetentionUnlimited() { + val props1 = TestUtils.createBrokerConfig(0,8181) + val props2 = TestUtils.createBrokerConfig(0,8181) + val props3 = TestUtils.createBrokerConfig(0,8181) + val props4 = TestUtils.createBrokerConfig(0,8181) + val props5 = TestUtils.createBrokerConfig(0,8181) + + props1.put("log.retention.ms", "-1") + props2.put("log.retention.minutes", "-1") + props3.put("log.retention.hours", "-1") + + val cfg1 = KafkaConfig.fromProps(props1) + val cfg2 = KafkaConfig.fromProps(props2) + val cfg3 = KafkaConfig.fromProps(props3) + assertEquals("Should be -1", -1, cfg1.logRetentionTimeMillis) + assertEquals("Should be -1", -1, cfg2.logRetentionTimeMillis) + assertEquals("Should be -1", -1, cfg3.logRetentionTimeMillis) + + props4.put("log.retention.ms", "-1") + props4.put("log.retention.minutes", "30") + + val cfg4 = KafkaConfig.fromProps(props4) + assertEquals("Should be -1", -1, cfg4.logRetentionTimeMillis) + + props5.put("log.retention.ms", "0") + + intercept[IllegalArgumentException] { + val cfg5 = KafkaConfig.fromProps(props5) + } + } + + @Test + def testLogRetentionValid { + val props1 = TestUtils.createBrokerConfig(0,8181) + val props2 = TestUtils.createBrokerConfig(0,8181) + val props3 = TestUtils.createBrokerConfig(0,8181) + + props1.put("log.retention.ms", "0") + props2.put("log.retention.minutes", "0") + props3.put("log.retention.hours", "0") + + intercept[IllegalArgumentException] { + val cfg1 = KafkaConfig.fromProps(props1) + } + intercept[IllegalArgumentException] { + val cfg2 = KafkaConfig.fromProps(props2) + } + intercept[IllegalArgumentException] { + val cfg3 = KafkaConfig.fromProps(props3) + } + + } + @Test def testAdvertiseDefaults() { val port = 9999 val hostName = "fake-host" -- 2.3.0