From 42de8cfe91b84f5a2eed7daeac271e9fdea3f908 Mon Sep 17 00:00:00 2001
From: "Joris V.R" <jvanremoortere@tagged.com>
Date: Wed, 4 Sep 2013 17:03:36 -0700
Subject: [PATCH] When flush was being called on Log, the logSegments function
 (which decides which segments actually need to get flushed)
 missed the case where only part of a segment needed to get
 flushed. The unit test for time based roll-over assumed no
 new segment was created even though a non-empty segment
 passed its old-age mark.

---
 core/src/main/scala/kafka/log/Log.scala          | 13 +++++++++++--
 core/src/test/scala/unit/kafka/log/LogTest.scala |  8 +++++---
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 626eb8f..86e726b 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -587,8 +587,17 @@ class Log(val dir: File,
    * Get all segments beginning with the segment that includes "from" and ending with the segment
    * that includes up to "to-1" or the end of the log (if to > logEndOffset)
    */
-  def logSegments(from: Long, to: Long) = asIterable(segments.subMap(from, true, to, false).values)
-  
+  def logSegments(from: Long, to: Long) : Iterable[LogSegment] = {
+    lock synchronized {
+      val start_key = Option(segments.floorKey(from))
+      if (start_key.isDefined) {
+        return asIterable(segments.subMap(start_key.get, true, to, false).values)
+      } else {
+        return asIterable(segments.headMap(to).values)
+      }
+    }
+  }
+ 
   override def toString() = "Log(" + dir + ")"
   
   /**
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b7f43e2..6351ecf 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -72,14 +72,16 @@ class LogTest extends JUnitSuite {
                       time = time)
     time.sleep(log.config.segmentMs + 1)
 
-    // segment age is less than its limit
+    // segment age is higher than its limit 
     log.append(set)
+    // there should be 1 segment because the log doesn't get rolled over when it's empty
     assertEquals("There should be exactly one segment.", 1, log.numberOfSegments)
 
     log.append(set)
-    assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
+    // there should now be 2 segments because the segment has passed its age limit and has data
+    assertEquals("There should be exactly two segments.", 2, log.numberOfSegments)
 
-    for(numSegments <- 2 until 4) {
+    for(numSegments <- 3 until 5) {
       time.sleep(log.config.segmentMs + 1)
       log.append(set)
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
-- 
1.8.0

