From 038c04ada2bd74afcb2b6eafed80d5e5260fdbea Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 11 Feb 2013 15:01:46 -0800
Subject: [PATCH 1/2] - Check the view size of FileMessageSet before writing the contents to the channel
 - Remove looping at FetchRespondSend level and give processor thread a chance to process other requests

---
 core/src/main/scala/kafka/api/FetchResponse.scala  |    2 +-
 core/src/main/scala/kafka/log/FileMessageSet.scala |   10 ++++++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 94650f1..051378a 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -220,7 +220,7 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
+      written += sends.writeTo(channel)
     }
     sent += written
     written
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 5845bb6..72d9d8d 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -71,7 +71,7 @@ class FileMessageSet private[kafka](val file: File,
   }
   
   /**
-   * Search forward for the file position of the last offset that is great than or equal to the target offset 
+   * Search forward for the file position of the last offset that is greater than or equal to the target offset
    * and return its physical position. If no such offsets are found, return null.
    */
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
@@ -97,8 +97,14 @@ class FileMessageSet private[kafka](val file: File,
   /**
    * Write some of this set to the given channel, return the amount written
    */
-  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
+  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
+    // Ensure that the underlying size has not changed.
+    val newSize = scala.math.min(channel.size().toInt, limit) - start
+    if (newSize < _size.get()) {
+      throw new KafkaException("FileMessageSet size has been truncated during write. OldSize %d NewSize %d".format(_size.get(), newSize))
+    }
     channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+  }
   
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.
-- 
1.7.1


From c009a22adcc5fa56c3d38a5ed8dc3103d45b529c Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 11 Feb 2013 16:30:07 -0800
Subject: [PATCH 2/2] - writeTo of MultiSend writes till the buffer is full

---
 core/src/main/scala/kafka/api/FetchResponse.scala  |    2 +-
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 .../main/scala/kafka/network/Transmission.scala    |   16 +++++++++++-----
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 051378a..e528742 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -125,7 +125,7 @@ class TopicDataSend(val topicData: TopicData) extends Send {
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
+      written += sends.writeTo(channel)
     }
     sent += written
     written
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 4aea6f3..1c71900 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -341,7 +341,7 @@ private[kafka] class Processor(val id: Int,
     if(responseSend == null)
       throw new IllegalStateException("Registered for write interest but no response attached to key.")
     val written = responseSend.writeTo(socketChannel)
-    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + "using key" + key)
     if(responseSend.complete) {
       response.request.updateRequestMetrics()
       key.attach(null)
diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala
index f87e9d0..6914560 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -91,11 +91,17 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
 
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete
-    val written = current.head.writeTo(channel)
-    totalWritten += written
-    if(current.head.complete)
-      current = current.tail
-    written
+    var totalWrite = 0
+    var sendComplete: Boolean = false
+    do {
+      val written = current.head.writeTo(channel)
+      totalWritten += written
+      totalWrite += written
+      sendComplete = current.head.complete
+      if(sendComplete)
+        current = current.tail
+    } while (!complete && sendComplete)
+    totalWrite
   }
   
   def complete: Boolean = {
-- 
1.7.1

