From 24870b783cfb65cdc65b892236dce639609356c4 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 5 Apr 2013 14:39:48 -0400 Subject: [PATCH] KAFKA-852, remove clientId from Offset{Fetch,Commit}Response --- .../scala/kafka/api/OffsetCommitResponse.scala | 15 ++------------- .../main/scala/kafka/api/OffsetFetchResponse.scala | 15 ++------------- core/src/main/scala/kafka/server/KafkaApis.scala | 6 ++---- 3 files changed, 6 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 3b0d861..1773921 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -25,14 +25,9 @@ import kafka.utils.Logging object OffsetCommitResponse extends Logging { val CurrentVersion: Short = 0 - val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetCommitResponse = { - // Read values from the envelope val correlationId = buffer.getInt - val clientId = readShortString(buffer) - - // Read the OffsetResponse val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topic = readShortString(buffer) @@ -43,23 +38,18 @@ object OffsetCommitResponse extends Logging { (TopicAndPartition(topic, partitionId), error) }) }) - OffsetCommitResponse(Map(pairs:_*), correlationId, clientId) + OffsetCommitResponse(Map(pairs:_*), correlationId) } } case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], - correlationId: Int = 0, - clientId: String = OffsetCommitResponse.DefaultClientId) + correlationId: Int = 0) extends RequestOrResponse { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) def writeTo(buffer: ByteBuffer) { - // Write envelope buffer.putInt(correlationId) - writeShortString(buffer, clientId) - - // Write OffsetCommitResponse buffer.putInt(requestInfoGroupedByTopic.size) // number of topics requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short] writeShortString(buffer, t1._1) // topic @@ -73,7 +63,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], override def sizeInBytes = 4 + /* correlationId */ - shortStringLength(clientId) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { val (topic, offsets) = topicAndOffsets diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala index 3d4ce2a..5c8d2c3 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -25,14 +25,9 @@ import kafka.utils.Logging object OffsetFetchResponse extends Logging { val CurrentVersion: Short = 0 - val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetFetchResponse = { - // Read values from the envelope val correlationId = buffer.getInt - val clientId = readShortString(buffer) - - // Read the OffsetResponse val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topic = readShortString(buffer) @@ -45,23 +40,18 @@ object OffsetFetchResponse extends Logging { (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error)) }) }) - OffsetFetchResponse(Map(pairs:_*), correlationId, clientId) + OffsetFetchResponse(Map(pairs:_*), correlationId) } } case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], - correlationId: Int = 0, - clientId: String = OffsetFetchResponse.DefaultClientId) + correlationId: Int = 0) extends RequestOrResponse { lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) def writeTo(buffer: ByteBuffer) { - // Write envelope buffer.putInt(correlationId) - writeShortString(buffer, clientId) - - // Write OffsetFetchResponse buffer.putInt(requestInfoGroupedByTopic.size) // number of topics requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError] writeShortString(buffer, t1._1) // topic @@ -77,7 +67,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat override def sizeInBytes = 4 + /* correlationId */ - shortStringLength(clientId) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { val (topic, offsets) = topicAndOffsets diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f8faf96..33fc730 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -496,8 +496,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } val response = new OffsetCommitResponse(responseInfo, - offsetCommitRequest.correlationId, - offsetCommitRequest.clientId) + offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } @@ -526,8 +525,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), - offsetFetchRequest.correlationId, - offsetFetchRequest.clientId) + offsetFetchRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } -- 1.7.5.4