From 832d0f31f4ea0879e75ba776dd51f60aa2542a78 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 13 Oct 2014 15:54:17 -0700 Subject: [PATCH 1/2] KAFKA-979 Add optional random jitter for time based log rolling. Add a new options log.roll.jitter.ms and log.roll.jitter.hours to add random jitter to time-based log rolling so logs aren't likely to roll at exactly the same time. Jitter always reduces the timeout so log.roll.ms still provides a soft maximum. Defaults to 0 so no jitter is added by default. --- core/src/main/scala/kafka/log/Log.scala | 8 +++++-- core/src/main/scala/kafka/log/LogCleaner.scala | 2 +- core/src/main/scala/kafka/log/LogConfig.scala | 10 +++++++++ core/src/main/scala/kafka/log/LogSegment.scala | 6 +++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 16 ++++++++++++- core/src/main/scala/kafka/server/KafkaServer.scala | 3 ++- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 2 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 26 ++++++++++++++++++++++ 8 files changed, 65 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index a123cdc..157d673 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -141,6 +141,7 @@ class Log(val dir: File, startOffset = start, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, time = time) if(!hasIndex) { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) @@ -156,6 +157,7 @@ class Log(val dir: File, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, time = time)) } else { recoverLog() @@ -510,7 +512,7 @@ class Log(val dir: File, private def maybeRoll(messagesSize: Int): LogSegment = { val segment = activeSegment if (segment.size > config.segmentSize - messagesSize || - segment.size > 0 && time.milliseconds - segment.created > config.segmentMs || + segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs || segment.index.isFull) { debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)." .format(name, @@ -519,7 +521,7 @@ class Log(val dir: File, segment.index.entries, segment.index.maxEntries, time.milliseconds - segment.created, - config.segmentMs)) + config.segmentMs - segment.rollJitterMs)) roll() } else { segment @@ -550,6 +552,7 @@ class Log(val dir: File, startOffset = newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, time = time) val prev = addSegment(segment) if(prev != null) @@ -642,6 +645,7 @@ class Log(val dir: File, newOffset, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, + rollJitterMs = config.randomSegmentJitter, time = time)) updateLogEndOffset(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index c20de4a..f8fcb84 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -340,7 +340,7 @@ private[log] class Cleaner(val id: Int, indexFile.delete() val messages = new FileMessageSet(logFile) val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) - val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, time) + val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) try { // clean segments into the new destination segment diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index d2cc9e3..b8ba87d 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -24,6 +24,7 @@ import kafka.common._ object Defaults { val SegmentSize = 1024 * 1024 val SegmentMs = Long.MaxValue + val SegmentJitterMs = 0L val FlushInterval = Long.MaxValue val FlushMs = Long.MaxValue val RetentionSize = Long.MaxValue @@ -43,6 +44,7 @@ object Defaults { * Configuration settings for a log * @param segmentSize The soft maximum for the size of a segment file in the log * @param segmentMs The soft maximum on the amount of time before a new log segment is rolled + * @param segmentJitterMs The maximum random jitter subtracted from segmentMs to avoid thundering herds of segment rolling * @param flushInterval The number of messages that can be written to the log before a flush is forced * @param flushMs The amount of time the log can have dirty data before a flush is forced * @param retentionSize The approximate total number of bytes this log can use @@ -60,6 +62,7 @@ object Defaults { */ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, val segmentMs: Long = Defaults.SegmentMs, + val segmentJitterMs: Long = Defaults.SegmentJitterMs, val flushInterval: Long = Defaults.FlushInterval, val flushMs: Long = Defaults.FlushMs, val retentionSize: Long = Defaults.RetentionSize, @@ -79,6 +82,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, import LogConfig._ props.put(SegmentBytesProp, segmentSize.toString) props.put(SegmentMsProp, segmentMs.toString) + props.put(SegmentJitterMsProp, segmentJitterMs.toString) props.put(SegmentIndexBytesProp, maxIndexSize.toString) props.put(FlushMessagesProp, flushInterval.toString) props.put(FlushMsProp, flushMs.toString) @@ -94,11 +98,15 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, props.put(MinInSyncReplicasProp, minInSyncReplicas.toString) props } + + def randomSegmentJitter: Long = + if (segmentJitterMs == 0) 0 else scala.util.Random.nextLong() % Math.min(segmentJitterMs, segmentMs) } object LogConfig { val SegmentBytesProp = "segment.bytes" val SegmentMsProp = "segment.ms" + val SegmentJitterMsProp = "segment.jitter.ms" val SegmentIndexBytesProp = "segment.index.bytes" val FlushMessagesProp = "flush.messages" val FlushMsProp = "flush.ms" @@ -115,6 +123,7 @@ object LogConfig { val ConfigNames = Set(SegmentBytesProp, SegmentMsProp, + SegmentJitterMsProp, SegmentIndexBytesProp, FlushMessagesProp, FlushMsProp, @@ -135,6 +144,7 @@ object LogConfig { def fromProps(props: Properties): LogConfig = { new LogConfig(segmentSize = props.getProperty(SegmentBytesProp, Defaults.SegmentSize.toString).toInt, segmentMs = props.getProperty(SegmentMsProp, Defaults.SegmentMs.toString).toLong, + segmentJitterMs = props.getProperty(SegmentJitterMsProp, Defaults.SegmentJitterMs.toString).toLong, maxIndexSize = props.getProperty(SegmentIndexBytesProp, Defaults.MaxIndexSize.toString).toInt, flushInterval = props.getProperty(FlushMessagesProp, Defaults.FlushInterval.toString).toLong, flushMs = props.getProperty(FlushMsProp, Defaults.FlushMs.toString).toLong, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 7597d30..ac96434 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -44,18 +44,20 @@ class LogSegment(val log: FileMessageSet, val index: OffsetIndex, val baseOffset: Long, val indexIntervalBytes: Int, + val rollJitterMs: Long, time: Time) extends Logging { var created = time.milliseconds - + /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 - def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, time: Time) = + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) = this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, + rollJitterMs, time) /* Return the size in bytes of this log segment */ diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 7fcbc16..6e26c54 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -57,7 +57,18 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro millisInHour * props.getIntInRange("log.roll.hours", 24*7, (1, Int.MaxValue)) } } - + + private def getLogRollTimeJitterMillis(): Long = { + val millisInHour = 60L * 60L * 1000L + + if(props.containsKey("log.roll.jitter.ms")) { + props.getIntInRange("log.roll.jitter.ms", (0, Int.MaxValue)) + } + else { + millisInHour * props.getIntInRange("log.roll.jitter.hours", 0, (0, Int.MaxValue)) + } + } + /*********** General Configuration ***********/ /* the broker id for this server */ @@ -131,6 +142,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro /* the maximum time before a new log segment is rolled out */ val logRollTimeMillis = getLogRollTimeMillis + /* the maximum jitter to subtract from logRollTimeMillis */ + val logRollTimeJitterMillis = getLogRollTimeJitterMillis + /* the number of hours to keep a log file before deleting it */ val logRetentionTimeMillis = getLogRetentionTimeMillis diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 3e9e91f..07c0a07 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -309,8 +309,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg def getLogManager(): LogManager = logManager private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { - val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, + val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, segmentMs = config.logRollTimeMillis, + segmentJitterMs = config.logRollTimeJitterMillis, flushInterval = config.logFlushIntervalMessages, flushMs = config.logFlushIntervalMs.toLong, retentionSize = config.logRetentionBytes, diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala index 7b97e6a..03fb351 100644 --- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala @@ -39,7 +39,7 @@ class LogSegmentTest extends JUnit3Suite { val idxFile = TestUtils.tempFile() idxFile.delete() val idx = new OffsetIndex(idxFile, offset, 1000) - val seg = new LogSegment(ms, idx, offset, 10, SystemTime) + val seg = new LogSegment(ms, idx, offset, 10, 0, SystemTime) segments += seg seg } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index a0cbd3b..d670ba7 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -88,6 +88,32 @@ class LogTest extends JUnitSuite { } /** + * Test for jitter s for time based log roll. This test appends messages then changes the time + * using the mock clock to force the log to roll and checks the number of segments. + */ + @Test + def testTimeBasedLogRollJitter() { + val set = TestUtils.singleMessageSet("test".getBytes()) + val maxJitter = 20 * 60L + + // create a log + val log = new Log(logDir, + logConfig.copy(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter), + recoveryPoint = 0L, + scheduler = time.scheduler, + time = time) + assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) + log.append(set) + + time.sleep(log.config.segmentMs - maxJitter) + log.append(set) + assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments); + time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1) + log.append(set) + assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments) + } + + /** * Test that appending more than the maximum segment size rolls the log */ @Test -- 2.1.2 From ecb3125344708577a5c6a465ef186ef69b95749c Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 14 Oct 2014 15:31:56 -0700 Subject: [PATCH 2/2] Addressing warning and Util.abs comments. --- core/src/main/scala/kafka/log/LogConfig.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index b8ba87d..e48922a 100644 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -18,6 +18,8 @@ package kafka.log import java.util.Properties +import org.apache.kafka.common.utils.Utils + import scala.collection._ import kafka.common._ @@ -100,7 +102,7 @@ case class LogConfig(val segmentSize: Int = Defaults.SegmentSize, } def randomSegmentJitter: Long = - if (segmentJitterMs == 0) 0 else scala.util.Random.nextLong() % Math.min(segmentJitterMs, segmentMs) + if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) } object LogConfig { -- 2.1.2