From 7d6a41546121512b1c1711c8683316f8fdc2f3c5 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 4 Mar 2013 14:41:48 -0800
Subject: [PATCH] check fetch offset before writing to log

---
 .../scala/kafka/server/AbstractFetcherThread.scala |    3 ++-
 1 files changed, 2 insertions(+), 1 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index a7d39b1..4ee23cd 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -114,7 +114,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
           case(topicAndPartition, partitionData) =>
             val (topic, partitionId) = topicAndPartition.asTuple
             val currentOffset = partitionMap.get(topicAndPartition)
-            if (currentOffset.isDefined) {
+            // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
+            if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
               partitionData.error match {
                 case ErrorMapping.NoError =>
                   val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
-- 
1.7.1

