From aa30ef42b4560bec2fea5b3bd35459b62e52f3bb Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 30 May 2013 15:29:55 -0700
Subject: [PATCH] Offset fix

---
 core/src/main/scala/kafka/log/Log.scala            |   15 +++++++++++++--
 core/src/main/scala/kafka/log/OffsetIndex.scala    |   20 +++++++++++++-------
 .../scala/unit/kafka/log/OffsetIndexTest.scala     |    3 ++-
 3 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index ef708e2..63cc383 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -143,6 +143,7 @@ private[kafka] class Log(val dir: File,
     // open all the segments read-only
     val logSegments = new ArrayList[LogSegment]
     val ls = dir.listFiles()
+
     if(ls != null) {
       for(file <- ls if file.isFile) {
         val filename = file.getName()
@@ -189,8 +190,18 @@ private[kafka] class Log(val dir: File,
       logSegments.get(logSegments.size() - 1).index.resize(maxIndexSize)
 
       // run recovery on the last segment if necessary
-      if(needsRecovery)
-        recoverSegment(logSegments.get(logSegments.size - 1))
+      if(needsRecovery) {
+        try {
+          recoverSegment(logSegments.get(logSegments.size - 1))
+        } catch {
+          case e: InvalidOffsetException =>
+            val startOffset = logSegments.get(logSegments.size - 1).start
+            info("Found invalid offset during recovery of the active segment. Deleting the segment and " +
+                 "creating an empty one with starting offset " + startOffset)
+            // truncate the active segment to its starting offset
+            logSegments.get(logSegments.size - 1).truncateTo(startOffset)
+        }
+      }
     }
 
     val segmentList = logSegments.toArray(new Array[LogSegment](logSegments.size))
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 60ebc52..fe64a01 100644
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -23,6 +23,7 @@ import java.nio._
 import java.nio.channels._
 import java.util.concurrent.atomic._
 import kafka.utils._
+import kafka.common.InvalidOffsetException
 
 /**
  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
@@ -178,13 +179,18 @@ class OffsetIndex(val file: File, val baseOffset: Long, val maxIndexSize: Int =
   def append(offset: Long, position: Int) {
     this synchronized {
       require(!isFull, "Attempt to append to a full index (size = " + size + ").")
-      require(size.get == 0 || offset > lastOffset, "Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d).".format(offset, entries, lastOffset))
-      debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
-      this.mmap.putInt((offset - baseOffset).toInt)
-      this.mmap.putInt(position)
-      this.size.incrementAndGet()
-      this.lastOffset = offset
-      require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
+      if (size.get == 0 || offset > lastOffset) {
+        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
+        this.mmap.putInt((offset - baseOffset).toInt)
+        this.mmap.putInt(position)
+        this.size.incrementAndGet()
+        this.lastOffset = offset
+        require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
+      }
+      else {
+        throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d)."
+          .format(offset, entries, lastOffset))
+      }
     }
   }
   
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 3b2c069..9213a5d 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -25,6 +25,7 @@ import org.scalatest.junit.JUnitSuite
 import scala.collection._
 import scala.util.Random
 import kafka.utils.TestUtils
+import kafka.common.InvalidOffsetException
 
 class OffsetIndexTest extends JUnitSuite {
   
@@ -89,7 +90,7 @@ class OffsetIndexTest extends JUnitSuite {
     assertWriteFails("Append should fail on a full index", idx, idx.maxEntries + 1, classOf[IllegalArgumentException])
   }
   
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[InvalidOffsetException])
   def appendOutOfOrder() {
     idx.append(51, 0)
     idx.append(50, 1)
-- 
1.7.1

