From b9e855c1b02bd757c20dfc2e72e4c839770357ce Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Fri, 18 Oct 2013 11:15:39 -0700 Subject: [PATCH] KAFKA-1042 logSegments(from, to) Misses first segment. I think this patch also fixes the continual timing problems we have had in log tests. The root cause was that we weren't passing through the clock instance so we were mixing instances of MockTime and SystemTime. This worked only because MockTime initializes to SystemTime.milliseconds so as long as the test took less than 1 ms it worked! --- core/src/main/scala/kafka/log/Log.scala | 20 +++++++++++++++----- core/src/main/scala/kafka/log/LogSegment.scala | 4 ++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 12 +++++------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 9fe61ff..008cc55 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -132,7 +132,8 @@ class Log(val dir: File, val segment = new LogSegment(dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize) + maxIndexSize = config.maxIndexSize, + time = time) if(!hasIndex) { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) @@ -146,7 +147,8 @@ class Log(val dir: File, segments.put(0, new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize)) + maxIndexSize = config.maxIndexSize, + time = time)) } else { recoverLog() // reset the index size of the currently active log segment to allow more entries @@ -472,7 +474,8 @@ class Log(val dir: File, val segment = new LogSegment(dir, startOffset = newOffset, indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize) + maxIndexSize = config.maxIndexSize, + time = time) val prev = addSegment(segment) if(prev != null) throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset)) @@ -561,7 +564,8 @@ class Log(val dir: File, addSegment(new LogSegment(dir, newOffset, indexIntervalBytes = config.indexInterval, - maxIndexSize = config.maxIndexSize)) + maxIndexSize = config.maxIndexSize, + time = time)) this.nextOffset.set(newOffset) this.recoveryPoint = math.min(newOffset, this.recoveryPoint) truncates.getAndIncrement @@ -592,7 +596,13 @@ class Log(val dir: File, */ def logSegments(from: Long, to: Long): Iterable[LogSegment] = { import JavaConversions._ - segments.subMap(from, true, to, false).values + lock synchronized { + val floor: java.lang.Long = segments.floorKey(from) + if(floor eq null) + asIterable(segments.headMap(to).values) + else + asIterable(segments.subMap(floor, true, to, false).values) + } } override def toString() = "Log(" + dir + ")" diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index fe39d79..0d6926e 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -48,12 +48,12 @@ class LogSegment(val log: FileMessageSet, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 - def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int) = + def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, time: Time) = this(new FileMessageSet(file = Log.logFilename(dir, startOffset)), new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize), startOffset, indexIntervalBytes, - SystemTime) + time) /* Return the size in bytes of this log segment */ def size: Long = log.sizeInBytes() diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 18d2e7c..140317c 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -32,7 +32,7 @@ import kafka.server.KafkaConfig class LogTest extends JUnitSuite { var logDir: File = null - val time = new MockTime + val time = new MockTime(0) var config: KafkaConfig = null val logConfig = LogConfig() @@ -62,7 +62,6 @@ class LogTest extends JUnitSuite { @Test def testTimeBasedLogRoll() { val set = TestUtils.singleMessageSet("test".getBytes()) - val time: MockTime = new MockTime() // create a log val log = new Log(logDir, @@ -70,16 +69,15 @@ class LogTest extends JUnitSuite { recoveryPoint = 0L, scheduler = time.scheduler, time = time) + assertEquals("Log begins with a single empty segment.", 1, log.numberOfSegments) time.sleep(log.config.segmentMs + 1) - - // segment age is less than its limit log.append(set) - assertEquals("There should be exactly one segment.", 1, log.numberOfSegments) + assertEquals("Log doesn't roll if doing so creates an empty segment.", 1, log.numberOfSegments) log.append(set) - assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments) + assertEquals("Log rolls on this append since time has expired.", 2, log.numberOfSegments) - for(numSegments <- 2 until 4) { + for(numSegments <- 3 until 5) { time.sleep(log.config.segmentMs + 1) log.append(set) assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments) -- 1.7.12.4 (Apple Git-37)