Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala =================================================================== --- core/src/test/scala/unit/kafka/log/LogManagerTest.scala (revision 1293324) +++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala (working copy) @@ -107,7 +107,7 @@ config = new KafkaConfig(props) { override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages override val enableZookeeper = false - override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Int] // keep exactly 6 segments + 1 roll over + override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long] // keep exactly 6 segments + 1 roll over override val logRetentionHours = retentionHours } logManager = new LogManager(config, null, time, -1, retentionMs, false) Index: core/src/main/scala/kafka/utils/Utils.scala =================================================================== --- core/src/main/scala/kafka/utils/Utils.scala (revision 1293324) +++ core/src/main/scala/kafka/utils/Utils.scala (working copy) @@ -194,6 +194,48 @@ } /** + * Read a required long property value or throw an exception if no such property is found + */ + def getLong(props: Properties, name: String): Long = { + if(props.containsKey(name)) + return getLong(props, name, -1) + else + throw new IllegalArgumentException("Missing required property '" + name + "'") + } + + /** + * Read an long from the properties instance + * @param props The properties to read from + * @param name The property name + * @param default The default value to use if the property is not found + * @return the long value + */ + def getLong(props: Properties, name: String, default: Long): Long = + getLongInRange(props, name, default, (Long.MinValue, Long.MaxValue)) + + /** + * Read an long from the properties instance. Throw an exception + * if the value is not in the given range (inclusive) + * @param props The properties to read from + * @param name The property name + * @param default The default value to use if the property is not found + * @param range The range in which the value must fall (inclusive) + * @throws IllegalArgumentException If the value is not in the given range + * @return the long value + */ + def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = { + val v = + if(props.containsKey(name)) + props.getProperty(name).toLong + else + default + if(v < range._1 || v > range._2) + throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".") + else + v + } + + /** * Read a boolean value from the properties instance * @param props The properties to read from * @param name The property name Index: core/src/main/scala/kafka/server/KafkaConfig.scala =================================================================== --- core/src/main/scala/kafka/server/KafkaConfig.scala (revision 1293324) +++ core/src/main/scala/kafka/server/KafkaConfig.scala (working copy) @@ -65,7 +65,7 @@ val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue)) /* the maximum size of the log before deleting it */ - val logRetentionSize = Utils.getInt(props, "log.retention.size", -1) + val logRetentionSize = Utils.getLong(props, "log.retention.size", -1) /* the number of hours to keep a log file before deleting it for some specific topic*/ val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))