From 5f107049514eb074a02c70e0e54366dfc7cd41ac Mon Sep 17 00:00:00 2001
From: Anton Karamanov <ataraxer@yandex-team.ru>
Date: Mon, 22 Sep 2014 15:02:50 +0400
Subject: [PATCH] KAFKA-1644; Inherit FetchResponse from RequestOrResponse

---
 core/src/main/scala/kafka/api/FetchResponse.scala | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala
index af93087..7caa057 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -152,7 +152,8 @@ object FetchResponse {
 
 
 case class FetchResponse(correlationId: Int,
-                         data: Map[TopicAndPartition, FetchResponsePartitionData]) {
+                         data: Map[TopicAndPartition, FetchResponsePartitionData])
+    extends RequestOrResponse() {
 
   /**
    * Partitions the data into a map of maps (one for each topic).
@@ -168,6 +169,14 @@ case class FetchResponse(correlationId: Int,
       folded + topicData.sizeInBytes
     })
 
+  def writeTo(buffer: ByteBuffer): Unit = {
+    buffer.putInt(sizeInBytes)
+    buffer.putInt(correlationId)
+    buffer.putInt(dataGroupedByTopic.size) // topic count
+  }
+
+  override def describe(details: Boolean): String = toString
+
   private def partitionDataFor(topic: String, partition: Int): FetchResponsePartitionData = {
     val topicAndPartition = TopicAndPartition(topic, partition)
     data.get(topicAndPartition) match {
@@ -199,9 +208,7 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
   override def complete = sent >= sendSize
 
   private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
-  buffer.putInt(size)
-  buffer.putInt(fetchResponse.correlationId)
-  buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
+  fetchResponse.writeTo(buffer)
   buffer.rewind()
 
   val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
-- 
1.8.3.2

