From 7b198e6e88aae1a8ca7864dfee11cf78b7e2417d Mon Sep 17 00:00:00 2001 From: David Arthur Date: Sat, 16 Feb 2013 15:47:41 -0500 Subject: [PATCH] KAFKA-759 Remove versionId from OffsetCommitResponse/OffsetFetchResponse --- .../scala/kafka/api/OffsetCommitResponse.scala | 6 +----- .../main/scala/kafka/api/OffsetFetchResponse.scala | 6 +----- core/src/main/scala/kafka/server/KafkaApis.scala | 2 -- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 4e1313e..3b0d861 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -29,7 +29,6 @@ object OffsetCommitResponse extends Logging { def readFrom(buffer: ByteBuffer): OffsetCommitResponse = { // Read values from the envelope - val versionId = buffer.getShort val correlationId = buffer.getInt val clientId = readShortString(buffer) @@ -44,12 +43,11 @@ object OffsetCommitResponse extends Logging { (TopicAndPartition(topic, partitionId), error) }) }) - OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId) + OffsetCommitResponse(Map(pairs:_*), correlationId, clientId) } } case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], - versionId: Short = OffsetCommitResponse.CurrentVersion, correlationId: Int = 0, clientId: String = OffsetCommitResponse.DefaultClientId) extends RequestOrResponse { @@ -58,7 +56,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], def writeTo(buffer: ByteBuffer) { // Write envelope - buffer.putShort(versionId) buffer.putInt(correlationId) writeShortString(buffer, clientId) @@ -75,7 +72,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short], } override def sizeInBytes = - 2 + /* versionId */ 4 + /* correlationId */ shortStringLength(clientId) + 4 + /* topic count */ diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala index fb5e6cb..3d4ce2a 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala @@ -29,7 +29,6 @@ object OffsetFetchResponse extends Logging { def readFrom(buffer: ByteBuffer): OffsetFetchResponse = { // Read values from the envelope - val versionId = buffer.getShort val correlationId = buffer.getInt val clientId = readShortString(buffer) @@ -46,12 +45,11 @@ object OffsetFetchResponse extends Logging { (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error)) }) }) - OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId) + OffsetFetchResponse(Map(pairs:_*), correlationId, clientId) } } case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError], - versionId: Short = OffsetFetchResponse.CurrentVersion, correlationId: Int = 0, clientId: String = OffsetFetchResponse.DefaultClientId) extends RequestOrResponse { @@ -60,7 +58,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat def writeTo(buffer: ByteBuffer) { // Write envelope - buffer.putShort(versionId) buffer.putInt(correlationId) writeShortString(buffer, clientId) @@ -79,7 +76,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat } override def sizeInBytes = - 2 + /* versionId */ 4 + /* correlationId */ shortStringLength(clientId) + 4 + /* topic count */ diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9b97ca6..24f1e62 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -489,7 +489,6 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = new OffsetCommitResponse(responseInfo, - offsetCommitRequest.versionId, offsetCommitRequest.correlationId, offsetCommitRequest.clientId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) @@ -521,7 +520,6 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), - offsetFetchRequest.versionId, offsetFetchRequest.correlationId, offsetFetchRequest.clientId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) -- 1.7.5.4