diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ebd171f..74442b6 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -31,6 +31,17 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro this(new VerifiableProperties(originalProps)) props.verify() } + + private def getLogRetentionTimeMillis(): Long = { + var millisInMinute = 60L * 1000L + val millisInHour = 60L * millisInMinute + if(props.containsKey("log.retention.minutes")){ + millisInMinute * props.getIntInRange("log.retention.minutes", (1, Int.MaxValue)) + } else { + millisInHour * props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) + } + + } /*********** General Configuration ***********/ @@ -92,7 +103,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val logRollHoursPerTopicMap = props.getMap("log.roll.hours.per.topic", _.toInt > 0).mapValues(_.toInt) /* the number of hours to keep a log file before deleting it */ - val logRetentionHours = props.getIntInRange("log.retention.hours", 24*7, (1, Int.MaxValue)) + val logRetentionTimeMillis = getLogRetentionTimeMillis /* the number of hours to keep a log file before deleting it for some specific topic*/ val logRetentionHoursPerTopicMap = props.getMap("log.retention.hours.per.topic", _.toInt > 0).mapValues(_.toInt) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c148fdf..5e35a89 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -253,7 +253,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg flushInterval = config.logFlushIntervalMessages, flushMs = config.logFlushIntervalMs.toLong, retentionSize = config.logRetentionBytes, - retentionMs = 60L * 60L * 1000L * config.logRetentionHours, + retentionMs = config.logRetentionTimeMillis, maxMessageSize = config.messageMaxBytes, maxIndexSize = config.logIndexSizeMaxBytes, indexInterval = config.logIndexIntervalBytes, diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala new file mode 100644 index 0000000..2f75e1d --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.junit.Test +import junit.framework.Assert._ +import org.scalatest.junit.JUnit3Suite +import kafka.utils.TestUtils + +class KafkaConfigTest extends JUnit3Suite { + + @Test + def testLogRetentionTimeHoursProvided() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("log.retention.hours", "1") + + val cfg = new KafkaConfig(props) + assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis) + + } + + @Test + def testLogRetentionTimeMinutesProvided() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("log.retention.minutes", "30") + + val cfg = new KafkaConfig(props) + assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis) + + } + + @Test + def testLogRetentionTimeNoConfigProvided() { + val props = TestUtils.createBrokerConfig(0, 8181) + + val cfg = new KafkaConfig(props) + assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis) + + } + + @Test + def testLogRetentionTimeBothMinutesAndHoursProvided() { + val props = TestUtils.createBrokerConfig(0, 8181) + props.put("log.retention.minutes", "30") + props.put("log.retention.hours", "1") + + val cfg = new KafkaConfig(props) + assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) + + } + +} \ No newline at end of file