From 8b135846e69aaccd32eddbd03adac9f07d52d687 Mon Sep 17 00:00:00 2001
From: Kelvin Rutt <kelvin.rutt@sky.com>
Date: Wed, 14 Jun 2017 01:15:47 +0100
Subject: [PATCH] KAFKA-5413: quick patch

---
 core/src/main/scala/kafka/log/LogCleaner.scala |  2 +-
 core/src/main/scala/kafka/log/LogSegment.scala | 13 +++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5f06a73..861e201 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -563,7 +563,7 @@ private[log] class Cleaner(val id: Int,
             logSize + segs.head.size <= maxSize &&
             indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
             timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
-            segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
+            segs.head.lastOffset() - group.last.index.baseOffset <= Int.MaxValue) {
         group = segs.head :: group
         logSize += segs.head.size
         indexSize += segs.head.index.sizeInBytes
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 8854c3a..bf5a89f 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -319,6 +319,19 @@ class LogSegment(val log: FileRecords,
     }
   }
 
+  def lastOffset(): Long = {
+     //workaround: index file was zero but log contains entries - calculate the last offset
+     if ( index.baseOffset == index.lastOffset && log.sizeInBytes() > 0)
+     {
+          nextOffset() - 1
+     }
+     else
+     {
+         index.lastOffset
+     }
+  }
+
+
   /**
    * Flush this log segment to disk
    */
-- 
2.7.4

