From a85774c1d0203ca889befd9ce44b6c299aad7c69 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 29 Oct 2014 16:45:54 -0700 Subject: [PATCH 1/5] Protocol --- .../org/apache/kafka/common/protocol/Protocol.java | 43 +++++++++++++++++++--- .../kafka/common/requests/OffsetCommitRequest.java | 40 +++++++++++++++++++- 2 files changed, 76 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 7517b87..70a31d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -118,12 +118,29 @@ public class Protocol { STRING, "Any associated metadata the client wants to keep.")); + public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Message offset to be committed."), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep.")); + public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic", - STRING, - "Topic to commit."), - new Field("partitions", - new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), - "Partitions to commit offsets.")); + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), + "Partitions to commit offsets.")); + + public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic", + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1), + "Partitions to commit offsets.")); public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", STRING, @@ -145,6 +162,22 @@ public class Protocol { new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), "Topics to commit offsets.")); + public static Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id", + STRING, + "The consumer group id."), + new Field("group_generation_id", + INT32, + "The generation of the consumer group."), + new Field("consumer_id", + STRING, + "The consumer id assigned by the group coordinator."), + new Field("retention_time", + INT64, + "Time period in seconds to retain the offset."), + new Field("topics", + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), + "Topics to commit offsets.")); + public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 3ee5cba..25c9549 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -33,6 +33,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { private static String GROUP_ID_KEY_NAME = "group_id"; private static String GENERATION_ID_KEY_NAME = "group_generation_id"; private static String CONSUMER_ID_KEY_NAME = "consumer_id"; + private static String RETENTION_KEY_NAME = "retention_time"; private static String TOPICS_KEY_NAME = "topics"; // topic level field names @@ -47,10 +48,13 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public static final int DEFAULT_GENERATION_ID = -1; public static final String DEFAULT_CONSUMER_ID = ""; + public static final long DEFAULT_TIMESTAMP = -1; + public static final long DEFAULT_RETENTION = -1; private final String groupId; private final int generationId; private final String consumerId; + private final long retentionTime; private final Map offsetData; public static final class PartitionData { @@ -77,6 +81,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { this.groupId = groupId; this.generationId = DEFAULT_GENERATION_ID; this.consumerId = DEFAULT_CONSUMER_ID; + this.retentionTime = DEFAULT_RETENTION; this.offsetData = offsetData; } @@ -96,6 +101,28 @@ public class OffsetCommitRequest extends AbstractRequestResponse { this.groupId = groupId; this.generationId = generationId; this.consumerId = consumerId; + this.retentionTime = DEFAULT_RETENTION; + this.offsetData = offsetData; + } + + /** + * Constructor for version 2. + * @param groupId + * @param generationId + * @param consumerId + * @param offsetData + */ + public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map offsetData) { + super(new Struct(curSchema)); + + initCommonFields(groupId, offsetData); + struct.set(GENERATION_ID_KEY_NAME, generationId); + struct.set(CONSUMER_ID_KEY_NAME, consumerId); + struct.set(RETENTION_KEY_NAME, retentionTime); + this.groupId = groupId; + this.generationId = generationId; + this.consumerId = consumerId; + this.retentionTime = retentionTime; this.offsetData = offsetData; } @@ -113,7 +140,8 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); - partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); + if (fetchPartitionData.timestamp != DEFAULT_TIMESTAMP) + partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); partitionArray.add(partitionData); } @@ -133,7 +161,9 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME); - long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + long timestamp = DEFAULT_TIMESTAMP; + if (partitionResponse.hasField(TIMESTAMP_KEY_NAME)) + timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); String metadata = partitionResponse.getString(METADATA_KEY_NAME); PartitionData partitionData = new PartitionData(offset, timestamp, metadata); offsetData.put(new TopicPartition(topic, partition), partitionData); @@ -151,6 +181,12 @@ public class OffsetCommitRequest extends AbstractRequestResponse { consumerId = struct.getString(CONSUMER_ID_KEY_NAME); else consumerId = DEFAULT_CONSUMER_ID; + + // This field only exists in v2 + if (struct.hasField(RETENTION_KEY_NAME)) + retentionTime = struct.getLong(RETENTION_KEY_NAME); + else + retentionTime = DEFAULT_RETENTION; } public String groupId() { -- 1.7.12.4 From 305ab986613a63c8461a7771db7ad033a97909dc Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Oct 2014 11:37:25 -0700 Subject: [PATCH 2/5] v1 --- .../org/apache/kafka/common/protocol/Protocol.java | 2 +- .../kafka/common/requests/OffsetCommitRequest.java | 49 ++++++++++---------- .../main/scala/kafka/api/OffsetCommitRequest.scala | 53 +++++++++++++++------- .../kafka/common/OffsetMetadataAndError.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 1 + .../main/scala/kafka/server/OffsetManager.scala | 15 +++++- .../api/RequestResponseSerializationTest.scala | 19 +++++++- 7 files changed, 96 insertions(+), 45 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 70a31d3..5c36ede 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -173,7 +173,7 @@ public class Protocol { "The consumer id assigned by the group coordinator."), new Field("retention_time", INT64, - "Time period in seconds to retain the offset."), + "Time period in ms to retain the offset."), new Field("topics", new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets.")); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 25c9549..fc27cfb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -33,7 +33,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { private static String GROUP_ID_KEY_NAME = "group_id"; private static String GENERATION_ID_KEY_NAME = "group_generation_id"; private static String CONSUMER_ID_KEY_NAME = "consumer_id"; - private static String RETENTION_KEY_NAME = "retention_time"; + private static String RETENTION_TIME_KEY_NAME = "retention_time"; private static String TOPICS_KEY_NAME = "topics"; // topic level field names @@ -49,7 +49,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public static final int DEFAULT_GENERATION_ID = -1; public static final String DEFAULT_CONSUMER_ID = ""; public static final long DEFAULT_TIMESTAMP = -1; - public static final long DEFAULT_RETENTION = -1; + public static final long DEFAULT_RETENTION_TIME = -1; private final String groupId; private final int generationId; @@ -81,7 +81,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { this.groupId = groupId; this.generationId = DEFAULT_GENERATION_ID; this.consumerId = DEFAULT_CONSUMER_ID; - this.retentionTime = DEFAULT_RETENTION; + this.retentionTime = DEFAULT_RETENTION_TIME; this.offsetData = offsetData; } @@ -92,6 +92,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { * @param consumerId * @param offsetData */ + @Deprecated public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map offsetData) { super(new Struct(curSchema)); @@ -101,7 +102,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { this.groupId = groupId; this.generationId = generationId; this.consumerId = consumerId; - this.retentionTime = DEFAULT_RETENTION; + this.retentionTime = DEFAULT_RETENTION_TIME; this.offsetData = offsetData; } @@ -118,7 +119,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { initCommonFields(groupId, offsetData); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(CONSUMER_ID_KEY_NAME, consumerId); - struct.set(RETENTION_KEY_NAME, retentionTime); + struct.set(RETENTION_TIME_KEY_NAME, retentionTime); this.groupId = groupId; this.generationId = generationId; this.consumerId = consumerId; @@ -153,6 +154,26 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public OffsetCommitRequest(Struct struct) { super(struct); + + groupId = struct.getString(GROUP_ID_KEY_NAME); + // This field only exists in v1. + if (struct.hasField(GENERATION_ID_KEY_NAME)) + generationId = struct.getInt(GENERATION_ID_KEY_NAME); + else + generationId = DEFAULT_GENERATION_ID; + + // This field only exists in v1. + if (struct.hasField(CONSUMER_ID_KEY_NAME)) + consumerId = struct.getString(CONSUMER_ID_KEY_NAME); + else + consumerId = DEFAULT_CONSUMER_ID; + + // This field only exists in v2 + if (struct.hasField(RETENTION_TIME_KEY_NAME)) + retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME); + else + retentionTime = DEFAULT_RETENTION_TIME; + offsetData = new HashMap(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicResponse = (Struct) topicResponseObj; @@ -169,24 +190,6 @@ public class OffsetCommitRequest extends AbstractRequestResponse { offsetData.put(new TopicPartition(topic, partition), partitionData); } } - groupId = struct.getString(GROUP_ID_KEY_NAME); - // This field only exists in v1. - if (struct.hasField(GENERATION_ID_KEY_NAME)) - generationId = struct.getInt(GENERATION_ID_KEY_NAME); - else - generationId = DEFAULT_GENERATION_ID; - - // This field only exists in v1. - if (struct.hasField(CONSUMER_ID_KEY_NAME)) - consumerId = struct.getString(CONSUMER_ID_KEY_NAME); - else - consumerId = DEFAULT_CONSUMER_ID; - - // This field only exists in v2 - if (struct.hasField(RETENTION_KEY_NAME)) - retentionTime = struct.getLong(RETENTION_KEY_NAME); - else - retentionTime = DEFAULT_RETENTION; } public String groupId() { diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 050615c..cfc5dde 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -26,7 +26,7 @@ import kafka.network.RequestChannel.Response import scala.collection._ object OffsetCommitRequest extends Logging { - val CurrentVersion: Short = 1 + val CurrentVersion: Short = 2 val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { @@ -34,8 +34,8 @@ object OffsetCommitRequest extends Logging { // Read values from the envelope val versionId = buffer.getShort - assert(versionId == 0 || versionId == 1, - "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") + assert(versionId == 0 || versionId == 1 || versionId == 2, + "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.") val correlationId = buffer.getInt val clientId = readShortString(buffer) @@ -43,14 +43,20 @@ object OffsetCommitRequest extends Logging { // Read the OffsetRequest val consumerGroupId = readShortString(buffer) - // version 1 specific fields + // version 1 and 2 specific fields var groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID var consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID - if (versionId == 1) { + if (versionId >= 1) { groupGenerationId = buffer.getInt consumerId = readShortString(buffer) } + // version 2 specific fields + var retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME + if (versionId == 2) { + retentionMs = buffer.getLong + } + val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { val topic = readShortString(buffer) @@ -59,14 +65,18 @@ object OffsetCommitRequest extends Logging { val partitionId = buffer.getInt val offset = buffer.getLong val timestamp = { - val given = buffer.getLong - if (given == -1L) now else given + if (versionId <= 1) { + val given = buffer.getLong + if (given == -1L) now else given + } + else + -1L } val metadata = readShortString(buffer) (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp)) }) }) - OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId) + OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId, retentionMs) } } @@ -76,11 +86,12 @@ case class OffsetCommitRequest(groupId: String, correlationId: Int = 0, clientId: String = OffsetCommitRequest.DefaultClientId, groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID, - consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID) + consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID, + retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) { - assert(versionId == 0 || versionId == 1, - "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0 or 1.") + assert(versionId == 0 || versionId == 1 || versionId == 2, + "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.") lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic) @@ -93,11 +104,17 @@ case class OffsetCommitRequest(groupId: String, // Write OffsetCommitRequest writeShortString(buffer, groupId) // consumer group - // version 1 specific data - if (versionId == 1) { + // version 1 and 2 specific data + if (versionId >= 1) { buffer.putInt(groupGenerationId) writeShortString(buffer, consumerId) } + + // version 2 specific data + if (versionId == 2) { + buffer.putLong(retentionMs) + } + buffer.putInt(requestInfoGroupedByTopic.size) // number of topics requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError] writeShortString(buffer, t1._1) // topic @@ -105,7 +122,9 @@ case class OffsetCommitRequest(groupId: String, t1._2.foreach( t2 => { buffer.putInt(t2._1.partition) buffer.putLong(t2._2.offset) - buffer.putLong(t2._2.timestamp) + // version 0 and 1 specific data + if (versionId <= 1) + buffer.putLong(t2._2.timestamp) writeShortString(buffer, t2._2.metadata) }) }) @@ -116,7 +135,8 @@ case class OffsetCommitRequest(groupId: String, 4 + /* correlationId */ shortStringLength(clientId) + shortStringLength(groupId) + - (if (versionId == 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) + + (if (versionId >= 1) 4 /* group generation id */ + shortStringLength(consumerId) else 0) + + (if (versionId == 2) 8 /* retention time */ else 0) + 4 + /* topic count */ requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => { val (topic, offsets) = topicAndOffsets @@ -127,7 +147,7 @@ case class OffsetCommitRequest(groupId: String, innerCount + 4 /* partition */ + 8 /* offset */ + - 8 /* timestamp */ + + (if (versionId <= 1) 8 else 0) /* timestamp */ + shortStringLength(offsetAndMetadata._2.metadata) }) }) @@ -149,6 +169,7 @@ case class OffsetCommitRequest(groupId: String, offsetCommitRequest.append("; GroupId: " + groupId) offsetCommitRequest.append("; GroupGenerationId: " + groupGenerationId) offsetCommitRequest.append("; ConsumerId: " + consumerId) + offsetCommitRequest.append("; RetentionMs: " + retentionMs) if(details) offsetCommitRequest.append("; RequestInfo: " + requestInfo.mkString(",")) offsetCommitRequest.toString() diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 4cabffe..db7157d 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -19,7 +19,7 @@ package kafka.common case class OffsetAndMetadata(offset: Long, metadata: String = OffsetAndMetadata.NoMetadata, - timestamp: Long = -1L) { + var timestamp: Long = -1L) { override def toString = "OffsetAndMetadata[%d,%s%s]" .format(offset, if (metadata != null && metadata.length > 0) metadata else "NO_METADATA", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 968b0c4..8db5dc3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -147,6 +147,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.groupId, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, + offsetCommitRequest.retentionMs, offsetCommitRequest.requestInfo, sendResponseCallback) } diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 2957bc4..a648ec9 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -209,6 +209,7 @@ class OffsetManager(val config: OffsetManagerConfig, def storeOffsets(groupName: String, consumerId: String, generationId: Int, + retentionMs: Long, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { @@ -219,10 +220,16 @@ class OffsetManager(val config: OffsetManagerConfig, } // construct the message set to append + val timestamp = + if (retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME + else + retentionMs + SystemTime.milliseconds + val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => new Message( key = OffsetManager.offsetCommitKey(groupName, topicAndPartition.topic, topicAndPartition.partition), - bytes = OffsetManager.offsetCommitValue(offsetAndMetadata) + bytes = OffsetManager.offsetCommitValue(offsetAndMetadata, timestamp) ) }.toSeq @@ -483,12 +490,16 @@ object OffsetManager { * Generates the payload for offset commit message from given offset and metadata * * @param offsetAndMetadata consumer's current offset and metadata + * @param timestamp global timestamp of the offset * @return payload for offset commit message */ - def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { + def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata, timestamp: Long): Array[Byte] = { val value = new Struct(CURRENT_SCHEMA.valueSchema) value.set(VALUE_OFFSET_FIELD, offsetAndMetadata.offset) value.set(VALUE_METADATA_FIELD, offsetAndMetadata.metadata) + // maybe override the timestamp with the global timestamp + if (timestamp != org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) + offsetAndMetadata.timestamp = timestamp value.set(VALUE_TIMESTAMP_FIELD, offsetAndMetadata.timestamp) val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cd16ced..a4ed2cd 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -151,8 +151,21 @@ object SerializationTestUtils { new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1) } + def createTestOffsetCommitRequestV2: OffsetCommitRequest = { + new OffsetCommitRequest( + groupId = "group 1", + retentionMs = SystemTime.milliseconds, + requestInfo=collection.immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata"), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata) + )) + } + def createTestOffsetCommitRequestV1: OffsetCommitRequest = { - new OffsetCommitRequest("group 1", collection.immutable.Map( + new OffsetCommitRequest( + versionId = 1, + groupId = "group 1", + requestInfo = collection.immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) )) @@ -232,6 +245,7 @@ class RequestResponseSerializationTest extends JUnitSuite { private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse private val offsetCommitRequestV0 = SerializationTestUtils.createTestOffsetCommitRequestV0 private val offsetCommitRequestV1 = SerializationTestUtils.createTestOffsetCommitRequestV1 + private val offsetCommitRequestV2 = SerializationTestUtils.createTestOffsetCommitRequestV2 private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse @@ -250,7 +264,8 @@ class RequestResponseSerializationTest extends JUnitSuite { collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse, stopReplicaRequest, stopReplicaResponse, producerRequest, producerResponse, fetchRequest, offsetRequest, offsetResponse, topicMetadataRequest, - topicMetadataResponse, offsetCommitRequestV0, offsetCommitRequestV1, + topicMetadataResponse, + offsetCommitRequestV0, offsetCommitRequestV1, offsetCommitRequestV2, offsetCommitResponse, offsetFetchRequest, offsetFetchResponse, consumerMetadataRequest, consumerMetadataResponse, consumerMetadataResponseNoCoordinator, heartbeatRequest, -- 1.7.12.4 From 9185380888c711e470f5d40ce949f4f80b34823b Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Oct 2014 11:40:53 -0700 Subject: [PATCH 3/5] last pass --- core/src/main/scala/kafka/server/OffsetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index a648ec9..4837a0c 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -497,7 +497,7 @@ object OffsetManager { val value = new Struct(CURRENT_SCHEMA.valueSchema) value.set(VALUE_OFFSET_FIELD, offsetAndMetadata.offset) value.set(VALUE_METADATA_FIELD, offsetAndMetadata.metadata) - // maybe override the timestamp with the global timestamp + // maybe override the timestamp with the global value if (timestamp != org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) offsetAndMetadata.timestamp = timestamp value.set(VALUE_TIMESTAMP_FIELD, offsetAndMetadata.timestamp) -- 1.7.12.4 From dcf445b4138e8c00b3d4804838c742d6e3006559 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 30 Oct 2014 11:42:54 -0700 Subject: [PATCH 4/5] minor TODO --- core/src/main/scala/kafka/common/OffsetMetadataAndError.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index db7157d..46b6306 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -20,6 +20,8 @@ package kafka.common case class OffsetAndMetadata(offset: Long, metadata: String = OffsetAndMetadata.NoMetadata, var timestamp: Long = -1L) { + // TODO: timestamp needs to be removed after we deprecate OffsetCommitRequest version 0 and 1 + override def toString = "OffsetAndMetadata[%d,%s%s]" .format(offset, if (metadata != null && metadata.length > 0) metadata else "NO_METADATA", -- 1.7.12.4 From a88cbf2bfd00226b2855dbf471a2ffab53417e3d Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 6 Nov 2014 15:35:07 -0800 Subject: [PATCH 5/5] Incorporate Joel's comments --- .../org/apache/kafka/common/protocol/Protocol.java | 6 +- .../kafka/common/requests/OffsetCommitRequest.java | 21 ++++-- .../kafka/common/requests/RequestResponseTest.java | 4 +- .../main/scala/kafka/api/OffsetCommitRequest.scala | 34 ++++++---- core/src/main/scala/kafka/server/KafkaApis.scala | 12 +++- core/src/main/scala/kafka/server/KafkaServer.scala | 1 + .../main/scala/kafka/server/OffsetManager.scala | 26 ++++---- .../scala/unit/kafka/server/OffsetCommitTest.scala | 75 +++++++++++++++++++++- 8 files changed, 136 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 5c36ede..e9f658d 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -191,9 +191,9 @@ public class Protocol { public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); - public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 }; - /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */ - public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0}; + public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2 }; + /* The response types for both V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */ + public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0 }; /* Offset fetch api */ public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index fc27cfb..8fd0d0c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -49,7 +49,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public static final int DEFAULT_GENERATION_ID = -1; public static final String DEFAULT_CONSUMER_ID = ""; public static final long DEFAULT_TIMESTAMP = -1; - public static final long DEFAULT_RETENTION_TIME = -1; + public static final long DEFAULT_RETENTION_TIME = Long.MAX_VALUE; private final String groupId; private final int generationId; @@ -62,11 +62,18 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public final long timestamp; public final String metadata; + @Deprecated public PartitionData(long offset, long timestamp, String metadata) { this.offset = offset; this.timestamp = timestamp; this.metadata = metadata; } + + public PartitionData(long offset, String metadata) { + this.offset = offset; + this.timestamp = DEFAULT_TIMESTAMP; + this.metadata = metadata; + } } /** @@ -111,6 +118,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { * @param groupId * @param generationId * @param consumerId + * @param retentionTime * @param offsetData */ public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map offsetData) { @@ -182,11 +190,14 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME); - long timestamp = DEFAULT_TIMESTAMP; - if (partitionResponse.hasField(TIMESTAMP_KEY_NAME)) - timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); String metadata = partitionResponse.getString(METADATA_KEY_NAME); - PartitionData partitionData = new PartitionData(offset, timestamp, metadata); + PartitionData partitionData; + if (partitionResponse.hasField(TIMESTAMP_KEY_NAME)) { + long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + partitionData = new PartitionData(offset, timestamp, metadata); + } else { + partitionData = new PartitionData(offset, metadata); + } offsetData.put(new TopicPartition(topic, partition), partitionData); } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index df37fc6..662d9f7 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -139,8 +139,8 @@ public class RequestResponseTest { private AbstractRequestResponse createOffsetCommitRequest() { Map commitData = new HashMap(); - commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, 1000000, "")); - return new OffsetCommitRequest("group1", 100, "consumer1", commitData); + commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, "")); + return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData); } private AbstractRequestResponse createOffsetCommitResponse() { diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index cfc5dde..c1e59bf 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -44,18 +44,26 @@ object OffsetCommitRequest extends Logging { val consumerGroupId = readShortString(buffer) // version 1 and 2 specific fields - var groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID - var consumerId: String = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID - if (versionId >= 1) { - groupGenerationId = buffer.getInt - consumerId = readShortString(buffer) - } + val groupGenerationId: Int = + if (versionId >= 1) { + buffer.getInt + } else { + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID + } + val consumerId: String = + if (versionId >= 1) { + readShortString(buffer) + } else { + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID + } // version 2 specific fields - var retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME - if (versionId == 2) { - retentionMs = buffer.getLong - } + val retentionMs: Long = + if (versionId == 2) { + buffer.getLong + } else { + org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME + } val topicCount = buffer.getInt val pairs = (1 to topicCount).flatMap(_ => { @@ -65,10 +73,8 @@ object OffsetCommitRequest extends Logging { val partitionId = buffer.getInt val offset = buffer.getLong val timestamp = { - if (versionId <= 1) { - val given = buffer.getLong - if (given == -1L) now else given - } + if (versionId <= 1) + buffer.getLong else -1L } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8db5dc3..3e17c82 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -142,12 +142,22 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } + // compute the retention time based on the request version: + // if it is before v2, we can use the default retention period config + val offsetRetention = + if (offsetCommitRequest.versionId <= 1) { + offsetManager.config.offsetsRetentionMs + } else { + offsetCommitRequest.retentionMs + } + + // call offset manager to store offsets offsetManager.storeOffsets( offsetCommitRequest.groupId, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, - offsetCommitRequest.retentionMs, + offsetRetention, offsetCommitRequest.requestInfo, sendResponseCallback) } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 4de8123..247d76f 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -352,6 +352,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, offsetsTopicNumPartitions = config.offsetsTopicPartitions, offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 4837a0c..99894ea 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -117,9 +117,9 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Compacting offsets cache.") val startMs = SystemTime.milliseconds - val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs) + val staleOffsets = offsetsCache.filter(_._2.timestamp < startMs) - debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs)) + debug("Found %d stale offsets (expiration timestamp passed already).".format(staleOffsets.size)) // delete the stale offsets from the table and generate tombstone messages to remove them from the log val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) => @@ -219,17 +219,14 @@ class OffsetManager(val config: OffsetManagerConfig, validateOffsetMetadataLength(offsetAndMetadata.metadata) } - // construct the message set to append - val timestamp = - if (retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) - org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME - else - retentionMs + SystemTime.milliseconds + // compute the global expiration timestamp + val expirationTimestamp = retentionMs + SystemTime.milliseconds + // construct the message set to append val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => new Message( key = OffsetManager.offsetCommitKey(groupName, topicAndPartition.topic, topicAndPartition.partition), - bytes = OffsetManager.offsetCommitValue(offsetAndMetadata, timestamp) + bytes = OffsetManager.offsetCommitValue(offsetAndMetadata, expirationTimestamp) ) }.toSeq @@ -490,18 +487,17 @@ object OffsetManager { * Generates the payload for offset commit message from given offset and metadata * * @param offsetAndMetadata consumer's current offset and metadata - * @param timestamp global timestamp of the offset + * @param expirationTimestamp global expiration time of the offset * @return payload for offset commit message */ - def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata, timestamp: Long): Array[Byte] = { + def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata, expirationTimestamp: Long): Array[Byte] = { val value = new Struct(CURRENT_SCHEMA.valueSchema) value.set(VALUE_OFFSET_FIELD, offsetAndMetadata.offset) value.set(VALUE_METADATA_FIELD, offsetAndMetadata.metadata) - // maybe override the timestamp with the global value - if (timestamp != org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) - offsetAndMetadata.timestamp = timestamp + // if the expiration timestamp is not specified, used the global value + if (offsetAndMetadata.timestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + offsetAndMetadata.timestamp = expirationTimestamp value.set(VALUE_TIMESTAMP_FIELD, offsetAndMetadata.timestamp) - val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) byteBuffer.putShort(CURRENT_OFFSET_SCHEMA_VERSION) value.writeTo(byteBuffer) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 8c5364f..08ce467 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -33,12 +33,13 @@ import scala.collection._ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val random: Random = new Random() + val brokerPort: Int = 9099 + val group = "test-group" + val retentionCheckInterval: Long = 100L var logDir: File = null var topicLogDir: File = null var server: KafkaServer = null var logSize: Int = 100 - val brokerPort: Int = 9099 - val group = "test-group" var simpleConsumer: SimpleConsumer = null var time: Time = new MockTime() @@ -46,6 +47,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { override def setUp() { super.setUp() val config: Properties = createBrokerConfig(1, brokerPort) + config.put("offsets.retention.check.interval.ms", retentionCheckInterval.toString) val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) time = new MockTime() @@ -68,7 +70,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { Utils.rm(logDir) super.tearDown() } - + /* @Test def testUpdateOffsets() { val topic = "topic" @@ -197,6 +199,73 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1) assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) + } + */ + @Test + def testOffsetExpiration() { + // set up topic partition + val topic = "topic" + val topicPartition = TopicAndPartition(topic, 0) + createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1) + + val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0))) + + // v0 version commit request with timestamp set to -1 + // should not expire + val commitRequest0 = OffsetCommitRequest( + groupId =group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata", -1)), + versionId = 0 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v1 version commit request with timestamp set to -1 + // should not expire + val commitRequest1 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(2L, "metadata", -1)), + versionId = 1 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest1).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(2L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v1 version commit request with timestamp set to now + // should expire + val commitRequest2 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(3L, "metadata", SystemTime.milliseconds)), + versionId = 1 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest2).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v2 version commit request with retention time set to 1 hour + // should not expire + val commitRequest3 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(4L, "metadata", -1)), + versionId = 2, + retentionMs = 1000 * 60 * 60 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest3).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(4L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // v2 version commit request with retention time set to 0 second + // should expire + val commitRequest4 = OffsetCommitRequest( + groupId = "test-group", + requestInfo = immutable.Map(TopicAndPartition(topic, 0) -> OffsetAndMetadata(5L, "metadata", -1)), + versionId = 2, + retentionMs = 0 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest4).commitStatus.get(topicPartition).get) + Thread.sleep(retentionCheckInterval * 2) + assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) } } -- 1.7.12.4