From c4148f498bb75a486292e7239e408389888a6ab8 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Mon, 21 Jan 2013 16:22:58 -0800
Subject: [PATCH 1/5] 	modified:   core/src/main/scala/kafka/api/FetchRequest.scala
 	modified:   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
 	modified:   core/src/main/scala/kafka/api/OffsetRequest.scala
 	modified:   core/src/main/scala/kafka/api/ProducerRequest.scala
 	modified:   core/src/main/scala/kafka/api/RequestOrResponse.scala
 	modified:   core/src/main/scala/kafka/api/StopReplicaRequest.scala
 	modified:   core/src/main/scala/kafka/api/TopicMetadataRequest.scala
 	modified:   core/src/main/scala/kafka/cluster/Partition.scala
 	modified:   core/src/main/scala/kafka/network/RequestChannel.scala
 	modified:   core/src/main/scala/kafka/server/KafkaApis.scala

---
 core/src/main/scala/kafka/api/FetchRequest.scala |    8 +++++++-
 core/src/main/scala/kafka/server/KafkaApis.scala |    1 +
 2 files changed, 8 insertions(+), 1 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index ac74931..e203860 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -24,7 +24,8 @@ import scala.collection.immutable.Map
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.network.{RequestChannel}
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -146,8 +147,13 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
     fetchRequest.append("; CorrelationId: " + correlationId)
     fetchRequest.append("; ClientId: " + clientId)
     fetchRequest.append("; ReplicaId: " + replicaId)
+<<<<<<< HEAD
     fetchRequest.append("; MaxWait: " + maxWait + " ms")
     fetchRequest.append("; MinBytes: " + minBytes + " bytes")
+=======
+    fetchRequest.append("; MaxWait: " + maxWait)
+    fetchRequest.append("; MinBytes: " + minBytes)
+>>>>>>> 	modified:   core/src/main/scala/kafka/api/FetchRequest.scala
     fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     fetchRequest.toString()
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a1a11a..d1c9fbe 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -56,6 +56,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     try{
       if(requestLogger.isTraceEnabled)
         requestLogger.trace("Handling request: %s".format(request.requestObj))
+
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
-- 
1.7.1


From 4184592542c641e4cb0f7ee3b1fbb3be87fc3890 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 24 Jan 2013 11:30:00 -0800
Subject: [PATCH 2/5] 	modified:   core/src/main/scala/kafka/api/FetchRequest.scala
 	modified:   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
 	modified:   core/src/main/scala/kafka/api/ProducerRequest.scala
 	modified:   core/src/main/scala/kafka/api/StopReplicaRequest.scala
 	modified:   core/src/main/scala/kafka/server/KafkaApis.scala

---
 core/src/main/scala/kafka/api/FetchRequest.scala |    8 +-------
 core/src/main/scala/kafka/server/KafkaApis.scala |    1 -
 2 files changed, 1 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index e203860..ac74931 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -24,8 +24,7 @@ import scala.collection.immutable.Map
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.network.{BoundedByteBufferSend, RequestChannel}
-import kafka.network.RequestChannel.Response
+import kafka.network.{RequestChannel}
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -147,13 +146,8 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
     fetchRequest.append("; CorrelationId: " + correlationId)
     fetchRequest.append("; ClientId: " + clientId)
     fetchRequest.append("; ReplicaId: " + replicaId)
-<<<<<<< HEAD
     fetchRequest.append("; MaxWait: " + maxWait + " ms")
     fetchRequest.append("; MinBytes: " + minBytes + " bytes")
-=======
-    fetchRequest.append("; MaxWait: " + maxWait)
-    fetchRequest.append("; MinBytes: " + minBytes)
->>>>>>> 	modified:   core/src/main/scala/kafka/api/FetchRequest.scala
     fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     fetchRequest.toString()
   }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index d1c9fbe..0a1a11a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -56,7 +56,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     try{
       if(requestLogger.isTraceEnabled)
         requestLogger.trace("Handling request: %s".format(request.requestObj))
-
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
-- 
1.7.1


From 91a7820432d51b5142c491c15b47b700ef0a18d5 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 24 Jan 2013 16:18:18 -0800
Subject: [PATCH 3/5] 	modified:   core/src/main/scala/kafka/network/RequestChannel.scala
 	modified:   core/src/main/scala/kafka/network/SocketServer.scala
 	modified:   core/src/main/scala/kafka/server/KafkaApis.scala

---
 .../main/scala/kafka/network/RequestChannel.scala  |    5 +++--
 .../main/scala/kafka/network/SocketServer.scala    |    5 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |    2 +-
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 5185dec..636ab80 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -25,10 +25,11 @@ import kafka.api._
 import kafka.common.TopicAndPartition
 import kafka.utils.{Logging, SystemTime}
 import kafka.message.ByteBufferMessageSet
+import java.net._
 
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(1, 2, getShutdownReceive(), 0)
+  val AllDone = new Request(1, 2, getShutdownReceive(), 0, new InetSocketAddress(0))
 
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]())
@@ -39,7 +40,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long) {
+  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress) {
     @volatile var dequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index e5dccd3..b056e25 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -313,11 +313,12 @@ private[kafka] class Processor(val id: Int,
       key.attach(receive)
     }
     val read = receive.readFrom(socketChannel)
-    trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
+    val address = socketChannel.socket.getRemoteSocketAddress();
+    trace(read + " bytes read from " + address)
     if(read < 0) {
       close(key)
     } else if(receive.complete) {
-      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds)
+      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
       requestChannel.sendRequest(req)
       trace("Received request, sending for processing by handler: " + req)
       key.attach(null)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0a1a11a..fac89a8 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,7 +55,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handle(request: RequestChannel.Request) {
     try{
       if(requestLogger.isTraceEnabled)
-        requestLogger.trace("Handling request: %s".format(request.requestObj))
+        requestLogger.trace("Handling request: %s from remote address: %s".format(request.requestObj, request.remoteAddress))
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
-- 
1.7.1


From 88465a215fcaf211838739113d25adbb3f23e0ed Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Thu, 24 Jan 2013 17:23:37 -0800
Subject: [PATCH 4/5] 	modified:   core/src/main/scala/kafka/network/RequestChannel.scala

---
 .../main/scala/kafka/network/RequestChannel.scala  |    4 ++--
 1 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 636ab80..9b0f7e9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -29,7 +29,7 @@ import java.net._
 
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(1, 2, getShutdownReceive(), 0, new InetSocketAddress(0))
+  val AllDone = new Request(1, 2, getShutdownReceive(), 0)
 
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, ByteBufferMessageSet]())
@@ -40,7 +40,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress) {
+  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0)) {
     @volatile var dequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
-- 
1.7.1


From 8b160b819720ec66d7782d4351b00407cb66f8cb Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Fri, 25 Jan 2013 09:31:51 -0800
Subject: [PATCH 5/5] 	modified:   core/src/main/scala/kafka/server/KafkaApis.scala

---
 core/src/main/scala/kafka/server/KafkaApis.scala |    2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fac89a8..b7b12cb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,7 +55,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handle(request: RequestChannel.Request) {
     try{
       if(requestLogger.isTraceEnabled)
-        requestLogger.trace("Handling request: %s from remote address: %s".format(request.requestObj, request.remoteAddress))
+        requestLogger.trace("Handling request: %s from client: %s".format(request.requestObj, request.remoteAddress))
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
-- 
1.7.1

