From a344454819753cf88e024bcbdb201757287828f4 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 11 Feb 2013 15:01:46 -0800
Subject: [PATCH 1/3] - Check the view size of FileMessageSet before writing the contents to the channel
 - Remove looping at FetchRespondSend level and give processor thread a chance to process other requests

---
 core/src/main/scala/kafka/api/FetchResponse.scala  |    2 +-
 core/src/main/scala/kafka/log/FileMessageSet.scala |   10 ++++++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 94650f1..051378a 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -220,7 +220,7 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
+      written += sends.writeTo(channel)
     }
     sent += written
     written
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 5845bb6..72d9d8d 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -71,7 +71,7 @@ class FileMessageSet private[kafka](val file: File,
   }
   
   /**
-   * Search forward for the file position of the last offset that is great than or equal to the target offset 
+   * Search forward for the file position of the last offset that is greater than or equal to the target offset
    * and return its physical position. If no such offsets are found, return null.
    */
   private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
@@ -97,8 +97,14 @@ class FileMessageSet private[kafka](val file: File,
   /**
    * Write some of this set to the given channel, return the amount written
    */
-  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int =
+  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
+    // Ensure that the underlying size has not changed.
+    val newSize = scala.math.min(channel.size().toInt, limit) - start
+    if (newSize < _size.get()) {
+      throw new KafkaException("FileMessageSet size has been truncated during write. OldSize %d NewSize %d".format(_size.get(), newSize))
+    }
     channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+  }
   
   /**
    * Get an iterator over the messages in the set. We only do shallow iteration here.
-- 
1.7.1


From 3df2c086eac58acc2c882c00014fe259b4374159 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 11 Feb 2013 16:30:07 -0800
Subject: [PATCH 2/3] - writeTo of MultiSend writes till the buffer is full

---
 core/src/main/scala/kafka/api/FetchResponse.scala  |    2 +-
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 .../main/scala/kafka/network/Transmission.scala    |   16 +++++++++++-----
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index 051378a..e528742 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -125,7 +125,7 @@ class TopicDataSend(val topicData: TopicData) extends Send {
     if(buffer.hasRemaining)
       written += channel.write(buffer)
     if(!buffer.hasRemaining && !sends.complete) {
-      written += sends.writeCompletely(channel)
+      written += sends.writeTo(channel)
     }
     sent += written
     written
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 4aea6f3..1c71900 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -341,7 +341,7 @@ private[kafka] class Processor(val id: Int,
     if(responseSend == null)
       throw new IllegalStateException("Registered for write interest but no response attached to key.")
     val written = responseSend.writeTo(socketChannel)
-    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
+    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + "using key" + key)
     if(responseSend.complete) {
       response.request.updateRequestMetrics()
       key.attach(null)
diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala
index f87e9d0..6914560 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -91,11 +91,17 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
 
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete
-    val written = current.head.writeTo(channel)
-    totalWritten += written
-    if(current.head.complete)
-      current = current.tail
-    written
+    var totalWrite = 0
+    var sendComplete: Boolean = false
+    do {
+      val written = current.head.writeTo(channel)
+      totalWritten += written
+      totalWrite += written
+      sendComplete = current.head.complete
+      if(sendComplete)
+        current = current.tail
+    } while (!complete && sendComplete)
+    totalWrite
   }
   
   def complete: Boolean = {
-- 
1.7.1


From def23a216480f5d598bfaf5fdf2b0ab18da0e5e0 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Tue, 12 Feb 2013 11:17:51 -0800
Subject: [PATCH 3/3] Add more tracing

removed some files

changes
---
 core/src/main/scala/kafka/log/FileMessageSet.scala |    4 +++-
 .../main/scala/kafka/network/SocketServer.scala    |    2 +-
 .../main/scala/kafka/network/Transmission.scala    |   10 +++++++---
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 72d9d8d..6d73be1 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -103,7 +103,9 @@ class FileMessageSet private[kafka](val file: File,
     if (newSize < _size.get()) {
       throw new KafkaException("FileMessageSet size has been truncated during write. OldSize %d NewSize %d".format(_size.get(), newSize))
     }
-    channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+    val bytesTransferred = channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel).toInt
+    trace("FileMessageSet : Bytes transferred : " + bytesTransferred + " Bytes requested for transfer : " + scala.math.min(size, sizeInBytes))
+    bytesTransferred
   }
   
   /**
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 1c71900..a78c6f0 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -341,7 +341,7 @@ private[kafka] class Processor(val id: Int,
     if(responseSend == null)
       throw new IllegalStateException("Registered for write interest but no response attached to key.")
     val written = responseSend.writeTo(socketChannel)
-    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + "using key" + key)
+    trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + "using key " + key)
     if(responseSend.complete) {
       response.request.updateRequestMetrics()
       key.attach(null)
diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala
index 6914560..2bde159 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -89,19 +89,23 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
   private var current = sends
   var totalWritten = 0
 
+  // This method continues to write to the socket buffer till an incomplete
+  // write happens. On an incomplete write, it returns to the caller to give it
+  // a chance to schedule other work till the buffered write completes.
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete
-    var totalWrite = 0
+    var totalWritePerCall = 0
     var sendComplete: Boolean = false
     do {
       val written = current.head.writeTo(channel)
       totalWritten += written
-      totalWrite += written
+      totalWritePerCall += written
       sendComplete = current.head.complete
       if(sendComplete)
         current = current.tail
     } while (!complete && sendComplete)
-    totalWrite
+    trace("Bytes written as part of multisend call : " + totalWritePerCall +  "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + expectedBytesToWrite)
+    totalWritePerCall
   }
   
   def complete: Boolean = {
-- 
1.7.1

