From 482813d89f5ac4efa0560cd951efcb6b0361b774 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 22 Apr 2013 14:23:46 -0700
Subject: [PATCH] 1. deep iteration on recovery
 2. deep iteration on fetcher thread

---
 core/src/main/scala/kafka/log/Log.scala            |   10 +++++++++-
 .../scala/kafka/server/AbstractFetcherThread.scala |    2 +-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e38b95c..191c762 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -218,7 +218,15 @@ private[kafka] class Log(val dir: File,
         val entry = iter.next
         entry.message.ensureValid()
         if(validBytes - lastIndexEntry > indexIntervalBytes) {
-          segment.index.append(entry.offset, validBytes)
+          // we need to decompress the message, if required, to get the first offset of the messageset
+          val startOffset =
+            entry.message.compressionCodec match {
+              case NoCompressionCodec =>
+                entry.offset
+              case _ =>
+                ByteBufferMessageSet.decompress(entry.message).head.offset
+          }
+          segment.index.append(startOffset, validBytes)
           lastIndexEntry = validBytes
         }
         validBytes += MessageSet.entrySize(entry.message)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index fed3b86..adcb411 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -120,7 +120,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                   try {
                     val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
                     val validBytes = messages.validBytes
-                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
+                    val newOffset = messages.lastOption match {
                       case Some(m: MessageAndOffset) => m.nextOffset
                       case None => currentOffset.get
                     }
-- 
1.7.1

