From ec7b9bfaf2006d005822fc2a76df6f1821f28e5d Mon Sep 17 00:00:00 2001 From: jholoman Date: Sun, 19 Apr 2015 13:40:38 -0400 Subject: [PATCH] KAFKA-1990 --- core/src/main/scala/kafka/log/LogConfig.scala | 11 ++- core/src/main/scala/kafka/log/LogManager.scala | 2 + core/src/main/scala/kafka/server/KafkaConfig.scala | 24 ++++-- core/src/test/scala/kafka/log/LogConfigTest.scala | 95 --------------------- .../test/scala/unit/kafka/admin/AdminTest.scala | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 99 ++++++++++++++++++++++ .../scala/unit/kafka/server/KafkaConfigTest.scala | 53 ++++++++++++ 7 files changed, 175 insertions(+), 111 deletions(-) delete mode 100644 core/src/test/scala/kafka/log/LogConfigTest.scala create mode 100644 core/src/test/scala/unit/kafka/log/LogConfigTest.scala diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 558c703..da55a34 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -21,8 +21,6 @@ import java.util.Properties import org.apache.kafka.common.utils.Utils import scala.collection._ import org.apache.kafka.common.config.ConfigDef -import kafka.common._ -import scala.collection.JavaConversions._ import kafka.message.BrokerCompressionCodec object Defaults { @@ -93,7 +91,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(FlushMessagesProp, flushInterval.toString) props.put(FlushMsProp, flushMs.toString) props.put(RetentionBytesProp, retentionSize.toString) - props.put(RententionMsProp, retentionMs.toString) + props.put(RetentionMsProp, retentionMs.toString) props.put(MaxMessageBytesProp, maxMessageSize.toString) props.put(IndexIntervalBytesProp, indexInterval.toString) props.put(DeleteRetentionMsProp, deleteRetentionMs.toString) @@ -122,7 +120,7 @@ object LogConfig { val FlushMessagesProp = "flush.messages" val FlushMsProp = "flush.ms" val RetentionBytesProp = "retention.bytes" - val RententionMsProp = "retention.ms" + val RetentionMsProp = "retention.ms" val MaxMessageBytesProp = "max.message.bytes" val IndexIntervalBytesProp = "index.interval.bytes" val DeleteRetentionMsProp = "delete.retention.ms" @@ -172,7 +170,8 @@ object LogConfig { .define(FlushMsProp, LONG, Defaults.FlushMs, atLeast(0), MEDIUM, FlushMsDoc) // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize .define(RetentionBytesProp, LONG, Defaults.RetentionSize, MEDIUM, RetentionSizeDoc) - .define(RententionMsProp, LONG, Defaults.RetentionMs, atLeast(0), MEDIUM, RetentionMsDoc) + // can be negative. See kafka.log.LogManager.cleanupExpiredSegments + .define(RetentionMsProp, LONG, Defaults.RetentionMs, MEDIUM, RetentionMsDoc) .define(MaxMessageBytesProp, INT, Defaults.MaxMessageSize, atLeast(0), MEDIUM, MaxMessageSizeDoc) .define(IndexIntervalBytesProp, INT, Defaults.IndexInterval, atLeast(0), MEDIUM, IndexIntervalDoc) .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM, DeleteRetentionMsDoc) @@ -206,7 +205,7 @@ object LogConfig { flushInterval = parsed.get(FlushMessagesProp).asInstanceOf[Long], flushMs = parsed.get(FlushMsProp).asInstanceOf[Long], retentionSize = parsed.get(RetentionBytesProp).asInstanceOf[Long], - retentionMs = parsed.get(RententionMsProp).asInstanceOf[Long], + retentionMs = parsed.get(RetentionMsProp).asInstanceOf[Long], maxMessageSize = parsed.get(MaxMessageBytesProp).asInstanceOf[Int], indexInterval = parsed.get(IndexIntervalBytesProp).asInstanceOf[Int], fileDeleteDelayMs = parsed.get(FileDeleteDelayMsProp).asInstanceOf[Long], diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a7a9b85..e781eba 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -407,6 +407,8 @@ class LogManager(val logDirs: Array[File], * Runs through the log removing segments older than a certain age */ private def cleanupExpiredSegments(log: Log): Int = { + if (log.config.retentionMs < 0) + return 0 val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 69b772c..cfbbd2b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -417,7 +417,7 @@ object KafkaConfig { .define(LogRetentionTimeMillisProp, LONG, HIGH, LogRetentionTimeMillisDoc, false) .define(LogRetentionTimeMinutesProp, INT, HIGH, LogRetentionTimeMinsDoc, false) - .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, atLeast(1), HIGH, LogRetentionTimeHoursDoc) + .define(LogRetentionTimeHoursProp, INT, Defaults.LogRetentionHours, HIGH, LogRetentionTimeHoursDoc) .define(LogRetentionBytesProp, LONG, Defaults.LogRetentionBytes, HIGH, LogRetentionBytesDoc) .define(LogCleanupIntervalMsProp, LONG, Defaults.LogCleanupIntervalMs, atLeast(1), MEDIUM, LogCleanupIntervalMsDoc) @@ -770,12 +770,16 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute - _logRetentionTimeMillis.getOrElse( - _logRetentionTimeMins match { - case Some(mins) => millisInMinute * mins - case None => millisInHour * logRetentionTimeHours - } - ) + val millis = { + _logRetentionTimeMillis.getOrElse( + _logRetentionTimeMins match { + case Some(mins) => millisInMinute * mins + case None => millisInHour * logRetentionTimeHours + } + ) + } + if (millis < 0) return -1 + millis } private def getMap(propName: String, propValue: String): Map[String, String] = { @@ -834,8 +838,10 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id") require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0") - require(logRetentionTimeMillis >= 1, "log.retention.ms must be equal or greater than 1") - require(_logRetentionTimeMins.forall(_ >= 1), "log.retention.minutes must be equal or greater than 1") + + require(_logRetentionTimeMins.forall(_ >= 1)|| _logRetentionTimeMins.forall(_ .equals(-1)), "log.retention.minutes must be unlimited (-1) or, equal or greater than 1") + require(logRetentionTimeHours >= 1 || logRetentionTimeHours == -1, "log.retention.hours must be unlimited (-1) or, equal or greater than 1") + require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1") require(logDirs.size > 0) require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.") diff --git a/core/src/test/scala/kafka/log/LogConfigTest.scala b/core/src/test/scala/kafka/log/LogConfigTest.scala deleted file mode 100644 index 9690f14..0000000 --- a/core/src/test/scala/kafka/log/LogConfigTest.scala +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import org.apache.kafka.common.config.ConfigException -import org.scalatest.junit.JUnit3Suite -import org.junit.{Assert, Test} -import java.util.Properties - -class LogConfigTest extends JUnit3Suite { - - @Test - def testFromPropsDefaults() { - val defaults = new Properties() - defaults.put(LogConfig.SegmentBytesProp, "4242") - val props = new Properties(defaults) - - val config = LogConfig.fromProps(props) - - Assert.assertEquals(4242, config.segmentSize) - Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize) - Assert.assertEquals("producer", config.compressionType) - } - - @Test - def testFromPropsEmpty() { - val p = new Properties() - val config = LogConfig.fromProps(p) - Assert.assertEquals(LogConfig(), config) - } - - @Test - def testFromPropsToProps() { - import scala.util.Random._ - val expected = new Properties() - LogConfig.configNames().foreach((name) => { - name match { - case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false")) - case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer", "uncompressed", "gzip")) - case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete)) - case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) - case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString) - case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString) - case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) - } - }) - - val actual = LogConfig.fromProps(expected).toProps - Assert.assertEquals(expected, actual) - } - - @Test - def testFromPropsInvalid() { - LogConfig.configNames().foreach((name) => { - name match { - case LogConfig.UncleanLeaderElectionEnableProp => return - case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") - case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); - case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") - case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") - case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1") - } - }) - } - - private def assertPropertyInvalid(name: String, values: AnyRef*) { - values.foreach((value) => { - val props = new Properties - props.setProperty(name, value.toString) - intercept[ConfigException] { - LogConfig.fromProps(props) - } - }) - } - - private def randFrom[T](choices: T*): T = { - import scala.util.Random - choices(Random.nextInt(choices.size)) - } -} diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index cfe38df..4b728a1 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -370,7 +370,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { def makeConfig(messageSize: Int, retentionMs: Long) = { var props = new Properties() props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString) - props.setProperty(LogConfig.RententionMsProp, retentionMs.toString) + props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString) props } diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala new file mode 100644 index 0000000..f3546ad --- /dev/null +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.log + +import java.util.Properties + +import kafka.log.{Defaults, LogConfig} +import org.apache.kafka.common.config.ConfigException +import org.junit.{Assert, Test} +import org.scalatest.junit.JUnit3Suite + +class LogConfigTest extends JUnit3Suite { + + @Test + def testFromPropsDefaults() { + val defaults = new Properties() + defaults.put(LogConfig.SegmentBytesProp, "4242") + val props = new Properties(defaults) + + val config = LogConfig.fromProps(props) + + Assert.assertEquals(4242, config.segmentSize) + Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize) + Assert.assertEquals("producer", config.compressionType) + } + + @Test + def testFromPropsEmpty() { + val p = new Properties() + val config = LogConfig.fromProps(p) + Assert.assertEquals(LogConfig(), config) + } + + @Test + def testFromPropsToProps() { + import scala.util.Random._ + val expected = new Properties() + LogConfig.configNames().foreach((name) => { + name match { + case LogConfig.UncleanLeaderElectionEnableProp => expected.setProperty(name, randFrom("true", "false")) + case LogConfig.CompressionTypeProp => expected.setProperty(name, randFrom("producer", "uncompressed", "gzip")) + case LogConfig.CleanupPolicyProp => expected.setProperty(name, randFrom(LogConfig.Compact, LogConfig.Delete)) + case LogConfig.MinCleanableDirtyRatioProp => expected.setProperty(name, "%.1f".format(nextDouble * .9 + .1)) + case LogConfig.MinInSyncReplicasProp => expected.setProperty(name, (nextInt(Int.MaxValue - 1) + 1).toString) + case LogConfig.RetentionBytesProp => expected.setProperty(name, nextInt().toString) + case LogConfig.RetentionMsProp => expected.setProperty(name, nextLong().toString) + case positiveIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) + } + }) + + val actual = LogConfig.fromProps(expected).toProps + Assert.assertEquals(expected, actual) + } + + @Test + def testFromPropsInvalid() { + LogConfig.configNames().foreach((name) => { + name match { + case LogConfig.UncleanLeaderElectionEnableProp => return + case LogConfig.RetentionBytesProp => assertPropertyInvalid(name, "not_a_number") + case LogConfig.RetentionMsProp => assertPropertyInvalid(name, "not_a_number" ) + case LogConfig.CleanupPolicyProp => assertPropertyInvalid(name, "true", "foobar"); + case LogConfig.MinCleanableDirtyRatioProp => assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2") + case LogConfig.MinInSyncReplicasProp => assertPropertyInvalid(name, "not_a_number", "0", "-1") + case positiveIntProperty => assertPropertyInvalid(name, "not_a_number", "-1") + } + }) + } + + private def assertPropertyInvalid(name: String, values: AnyRef*) { + values.foreach((value) => { + val props = new Properties + props.setProperty(name, value.toString) + intercept[ConfigException] { + LogConfig.fromProps(props) + } + }) + } + + private def randFrom[T](choices: T*): T = { + import scala.util.Random + choices(Random.nextInt(choices.size)) + } +} diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 40c265a..2428dbd 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -83,6 +83,59 @@ class KafkaConfigTest extends JUnit3Suite { val cfg = KafkaConfig.fromProps(props) assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) } + @Test + def testLogRetentionUnlimited() { + val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props2 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props3 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) + + props1.put("log.retention.ms", "-1") + props2.put("log.retention.minutes", "-1") + props3.put("log.retention.hours", "-1") + + val cfg1 = KafkaConfig.fromProps(props1) + val cfg2 = KafkaConfig.fromProps(props2) + val cfg3 = KafkaConfig.fromProps(props3) + assertEquals("Should be -1", -1, cfg1.logRetentionTimeMillis) + assertEquals("Should be -1", -1, cfg2.logRetentionTimeMillis) + assertEquals("Should be -1", -1, cfg3.logRetentionTimeMillis) + + props4.put("log.retention.ms", "-1") + props4.put("log.retention.minutes", "30") + + val cfg4 = KafkaConfig.fromProps(props4) + assertEquals("Should be -1", -1, cfg4.logRetentionTimeMillis) + + props5.put("log.retention.ms", "0") + + intercept[IllegalArgumentException] { + val cfg5 = KafkaConfig.fromProps(props5) + } + } + + @Test + def testLogRetentionValid { + val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) + + props1.put("log.retention.ms", "0") + props2.put("log.retention.minutes", "0") + props3.put("log.retention.hours", "0") + + intercept[IllegalArgumentException] { + val cfg1 = KafkaConfig.fromProps(props1) + } + intercept[IllegalArgumentException] { + val cfg2 = KafkaConfig.fromProps(props2) + } + intercept[IllegalArgumentException] { + val cfg3 = KafkaConfig.fromProps(props3) + } + + } @Test def testAdvertiseDefaults() { -- 2.3.0