From 038c04ada2bd74afcb2b6eafed80d5e5260fdbea Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 11 Feb 2013 15:01:46 -0800
Subject: [PATCH] - Check the view size of FileMessageSet before writing the contents to the channel
 - Remove looping at FetchRespondSend level and give processor thread a chance to process other requests

---
 core/src/main/scala/kafka/api/FetchResponse.scala  |    2 +-
 core/src/main/scala/kafka/log/FileMessageSet.scala |   10 ++++++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 94650f1..051378a 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -220,7 +220,7 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
+      written += sends.writeTo(channel)
     }
     sent += written
     written
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 5845bb6..72d9d8d 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -71,7 +71,7 @@ class FileMessageSet private[kafka](val file: File,
   }
   
   /**
-   * Search forward for the file position of the last offset that is great than or equal to the target offset 
+   * Search forward for the file position of the last offset that is greater than or equal to the target offset
    * and return its physical position. If no such offsets are found, return null.
    */
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
@@ -97,8 +97,14 @@ class FileMessageSet private[kafka](val file: File,
   /**
    * Write some of this set to the given channel, return the amount written
    */
-  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
+  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
+    // Ensure that the underlying size has not changed.
+    val newSize = scala.math.min(channel.size().toInt, limit) - start
+    if (newSize < _size.get()) {
+      throw new KafkaException("FileMessageSet size has been truncated during write. OldSize %d NewSize %d".format(_size.get(), newSize))
+    }
     channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+  }
   
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.
-- 
1.7.1

