From 8879534772a5d72d51d21dc4cd8dd8411e1df431 Mon Sep 17 00:00:00 2001
From: Sriram Subramanian <srsubram@srsubram-ld.linkedin.biz>
Date: Tue, 15 Jan 2013 15:55:48 -0800
Subject: [PATCH] 	modified:   core/src/main/scala/kafka/api/FetchRequest.scala
 	modified:   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
 	modified:   core/src/main/scala/kafka/api/OffsetRequest.scala
 	modified:   core/src/main/scala/kafka/api/ProducerRequest.scala
 	modified:   core/src/main/scala/kafka/api/RequestOrResponse.scala
 	modified:   core/src/main/scala/kafka/api/StopReplicaRequest.scala
 	modified:   core/src/main/scala/kafka/api/TopicMetadataRequest.scala
 	modified:   core/src/main/scala/kafka/cluster/Partition.scala
 	modified:   core/src/main/scala/kafka/server/KafkaApis.scala

---
 core/src/main/scala/kafka/api/FetchRequest.scala   |   26 ++++++++++++++++++-
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |   24 +++++++++++++++++
 core/src/main/scala/kafka/api/OffsetRequest.scala  |   24 +++++++++++++++++-
 .../src/main/scala/kafka/api/ProducerRequest.scala |   24 +++++++++++++++++-
 .../main/scala/kafka/api/RequestOrResponse.scala   |    8 ++++-
 .../main/scala/kafka/api/StopReplicaRequest.scala  |   27 ++++++++++++++++++-
 .../scala/kafka/api/TopicMetadataRequest.scala     |   23 ++++++++++++++++-
 core/src/main/scala/kafka/cluster/Partition.scala  |    4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |    4 +++
 9 files changed, 154 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index b4fb874..0b7aabb 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -21,9 +21,11 @@ import java.nio.ByteBuffer
 import kafka.utils.nonthreadsafe
 import kafka.api.ApiUtils._
 import scala.collection.immutable.Map
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.atomic.AtomicInteger
+import kafka.network.{RequestChannel}
+
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -121,6 +123,28 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
   def isFromLowLevelConsumer = replicaId == Request.DebuggingConsumerId
 
   def numPartitions = requestInfo.size
+
+  override def toString(): String = {
+    val fetchRequest = new StringBuilder
+    fetchRequest.append("Version: " + versionId)
+    fetchRequest.append("; CorrelationId: " + correlationId)
+    fetchRequest.append("; ClientId: " + clientId)
+    fetchRequest.append("; ReplicaId: " + replicaId)
+    fetchRequest.append("; MaxWait: " + maxWait)
+    fetchRequest.append("; MinBytes: " + minBytes)
+    fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    fetchRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val fetchResponsePartitionData = requestInfo.map {
+      case (topicAndPartition, data) =>
+        (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null))
+    }
+    val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
+    error("error when handling request %s".format(this), e)
+  }
 }
 
 
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 99af002..fe0deb4 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -23,6 +23,9 @@ import kafka.utils._
 import kafka.api.ApiUtils._
 import kafka.cluster.Broker
 import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
 
 
 object LeaderAndIsr {
@@ -157,4 +160,25 @@ case class LeaderAndIsrRequest (versionId: Short,
       size += broker.sizeInBytes /* broker info */
     size
   }
+
+  override def toString(): String = {
+    val leaderAndIsrRequest = new StringBuilder
+    leaderAndIsrRequest.append("Version: " + versionId)
+    leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
+    leaderAndIsrRequest.append("; ClientId: " + clientId)
+    leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs)
+    leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
+    leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
+    leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
+    leaderAndIsrRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val responseMap = partitionStateInfos.map {
+      case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    error("error when handling request %s".format(this), e)
+  }
 }
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 6c522bc..02b6cc6 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -18,8 +18,10 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.api.ApiUtils._
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
 
 
 object OffsetRequest {
@@ -104,4 +106,24 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
 
   def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
   def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
+
+  override def toString(): String = {
+    val offsetRequest = new StringBuilder
+    offsetRequest.append("Version: " + versionId)
+    offsetRequest.append("; CorrelationId: " + correlationId)
+    offsetRequest.append("; ClientId: " + clientId)
+    offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+    offsetRequest.append("; ReplicaId: " + replicaId)
+    offsetRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val partitionOffsetResponseMap = requestInfo.map {
+      case (topicAndPartition, partitionOffsetRequest) =>
+        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
+    }
+    val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    error("error when handling request %s".format(this), e)
+  }
 }
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index ffa96a6..2c3de2c 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -20,8 +20,10 @@ package kafka.api
 import java.nio._
 import kafka.message._
 import scala.collection.Map
-import kafka.common.TopicAndPartition
 import kafka.api.ApiUtils._
+import kafka.common._
+import kafka.network.RequestChannel.Response
+import kafka.network.{RequestChannel, BoundedByteBufferSend}
 
 object ProducerRequest {
   val CurrentVersion = 0.shortValue
@@ -120,5 +122,25 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
 
   def numPartitions = data.size
 
+  override def toString(): String = {
+    val producerRequest = new StringBuilder
+    producerRequest.append("Version: " + versionId)
+    producerRequest.append("; CorrelationId: " + correlationId)
+    producerRequest.append("; ClientId: " + clientId)
+    producerRequest.append("; RequiredAcks: " + requiredAcks)
+    producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs)
+    producerRequest.append("; TopicAndPartition: " + data.keys.mkString(","))
+    producerRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val producerResponseStatus = data.map {
+      case (topicAndPartition, data) =>
+        (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+    }
+    val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    error("error when handling request %s".format(this), e)
+  }
 }
 
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 83ad42c..3175e1c 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -18,6 +18,8 @@ package kafka.api
  */
 
 import java.nio._
+import kafka.network.RequestChannel
+import kafka.utils.Logging
 
 object Request {
   val OrdinaryConsumerId: Int = -1
@@ -25,10 +27,12 @@ object Request {
 }
 
 
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging{
 
   def sizeInBytes: Int
   
   def writeTo(buffer: ByteBuffer): Unit
-  
+
+  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {}
 }
+
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 9fe849b..9303e97 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -20,8 +20,10 @@ package kafka.api
 
 import java.nio._
 import kafka.api.ApiUtils._
-import kafka.utils.Logging
-import kafka.network.InvalidRequestException
+import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.utils.{Logging}
 
 
 object StopReplicaRequest extends Logging {
@@ -93,4 +95,25 @@ case class StopReplicaRequest(versionId: Short,
     }
     size
   }
+
+  override def toString(): String = {
+    val stopReplicaRequest = new StringBuilder
+    stopReplicaRequest.append("Version: " + versionId)
+    stopReplicaRequest.append("; CorrelationId: " + correlationId)
+    stopReplicaRequest.append("; ClientId: " + clientId)
+    stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs)
+    stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
+    stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
+    stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
+    stopReplicaRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val responseMap = partitions.map {
+      case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }.toMap
+    error("error when handling request %s".format(this), e)
+    val errorResponse = StopReplicaResponse(correlationId, responseMap)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
 }
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index fe1170f..8f91bba 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -20,7 +20,10 @@ package kafka.api
 import java.nio.ByteBuffer
 import kafka.api.ApiUtils._
 import collection.mutable.ListBuffer
-import kafka.utils.Logging
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.utils.{Logging}
 
 object TopicMetadataRequest extends Logging {
   val CurrentVersion = 0.shortValue
@@ -67,4 +70,22 @@ case class TopicMetadataRequest(val versionId: Short,
     4 + /* number of topics */
     topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
   }
+
+  override def toString(): String = {
+    val topicMetadataRequest = new StringBuilder
+    topicMetadataRequest.append("Version: " + versionId)
+    topicMetadataRequest.append("; CorrelationId: " + correlationId)
+    topicMetadataRequest.append("; ClientId: " + clientId)
+    topicMetadataRequest.append("; Topics: " + topics.mkString(","))
+    topicMetadataRequest.toString()
+  }
+
+  override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    val topicMetadata = topics.map {
+      topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    }
+    val errorResponse = TopicMetadataResponse(topicMetadata, correlationId)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+    error("error when handling request %s".format(this), e)
+  }
 }
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ea5b5a0..71eb980 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -337,8 +337,8 @@ class Partition(val topic: String,
     partitionString.append("Topic: " + topic)
     partitionString.append("; Partition: " + partitionId)
     partitionString.append("; Leader: " + leaderReplicaIdOpt)
-    partitionString.append("; Assigned replicas: " + assignedReplicaMap.keys.mkString(","))
-    partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+    partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+    partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
     partitionString.toString()
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 60752fb..7513552 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -65,6 +65,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } catch {
       case e: Throwable =>
+<<<<<<< HEAD
         request.requestId match {
           case RequestKeys.ProduceKey =>
             val apiRequest = request.requestObj.asInstanceOf[ProducerRequest]
@@ -118,6 +119,9 @@ class KafkaApis(val requestChannel: RequestChannel,
             val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
         }
+=======
+        request.requestObj.handleError(e, requestChannel, request)
+>>>>>>> 	modified:   core/src/main/scala/kafka/api/FetchRequest.scala
     } finally
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
-- 
1.7.1

