From 7c77e039b4ce308402c8711cada12208949a334a Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Wed, 7 Jan 2015 17:51:18 -0800 Subject: [PATCH] remove timestamp from version 0 protocol --- .../org/apache/kafka/common/protocol/Protocol.java | 19 ++++++++++++++++++- .../kafka/common/requests/OffsetCommitRequest.java | 22 +++++++++++++++++----- .../main/scala/kafka/api/OffsetCommitRequest.scala | 22 ++++++++++++++++------ .../kafka/common/OffsetMetadataAndError.scala | 2 +- core/src/main/scala/kafka/server/KafkaApis.scala | 1 + .../api/RequestResponseSerializationTest.scala | 4 ++-- 6 files changed, 55 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 7517b87..2c0141f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -111,6 +111,16 @@ public class Protocol { new Field("offset", INT64, "Message offset to be committed."), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep.")); + + public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Message offset to be committed."), new Field("timestamp", INT64, "Timestamp of the commit"), @@ -125,6 +135,13 @@ public class Protocol { new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), "Partitions to commit offsets.")); + public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic", + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1), + "Partitions to commit offsets.")); + public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."), @@ -142,7 +159,7 @@ public class Protocol { STRING, "The consumer id assigned by the group coordinator."), new Field("topics", - new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets.")); public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 3ee5cba..66c0772 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -47,6 +47,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public static final int DEFAULT_GENERATION_ID = -1; public static final String DEFAULT_CONSUMER_ID = ""; + public static final long DEFAULT_TIMESTAMP = -1L; private final String groupId; private final int generationId; @@ -58,6 +59,11 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public final long timestamp; public final String metadata; + // for v0 + public PartitionData(long offset, String metadata) { + this(offset, DEFAULT_TIMESTAMP, metadata); + } + public PartitionData(long offset, long timestamp, String metadata) { this.offset = offset; this.timestamp = timestamp; @@ -73,7 +79,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { @Deprecated public OffsetCommitRequest(String groupId, Map offsetData) { super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0))); - initCommonFields(groupId, offsetData); + initCommonFields(groupId, offsetData, 0); this.groupId = groupId; this.generationId = DEFAULT_GENERATION_ID; this.consumerId = DEFAULT_CONSUMER_ID; @@ -90,7 +96,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map offsetData) { super(new Struct(curSchema)); - initCommonFields(groupId, offsetData); + initCommonFields(groupId, offsetData, 1); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(CONSUMER_ID_KEY_NAME, consumerId); this.groupId = groupId; @@ -99,7 +105,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { this.offsetData = offsetData; } - private void initCommonFields(String groupId, Map offsetData) { + private void initCommonFields(String groupId, Map offsetData, int versionId) { Map> topicsData = CollectionUtils.groupDataByTopic(offsetData); struct.set(GROUP_ID_KEY_NAME, groupId); @@ -113,7 +119,8 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); - partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); + if (versionId == 1) + partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); partitionArray.add(partitionData); } @@ -133,7 +140,12 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME); - long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + long timestamp; + // timestamp only exists in v1 + if (partitionResponse.hasField(TIMESTAMP_KEY_NAME)) + timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + else + timestamp = DEFAULT_TIMESTAMP; String metadata = partitionResponse.getString(METADATA_KEY_NAME); PartitionData partitionData = new PartitionData(offset, timestamp, metadata); offsetData.put(new TopicPartition(topic, partition), partitionData); diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 861a6cf..5716c8a 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -30,8 +30,6 @@ object OffsetCommitRequest extends Logging { val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { - val now = SystemTime.milliseconds - // Read values from the envelope val versionId = buffer.getShort assert(versionId == 0 || versionId == 1, @@ -59,8 +57,11 @@ object OffsetCommitRequest extends Logging { val partitionId = buffer.getInt val offset = buffer.getLong val timestamp = { - val given = buffer.getLong - if (given == -1L) now else given + if (versionId == 1) { + val given = buffer.getLong + given + } else + OffsetAndMetadata.InvalidTime } val metadata = readShortString(buffer) (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp)) @@ -68,6 +69,14 @@ object OffsetCommitRequest extends Logging { }) OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId) } + + def changeInvalidTimeToCurrentTime(offsetCommitRequest: OffsetCommitRequest) { + val now = SystemTime.milliseconds + + for ( (topicAndPartiiton, offsetAndMetadata) <- offsetCommitRequest.requestInfo) + if (offsetAndMetadata.timestamp == OffsetAndMetadata.InvalidTime) + offsetAndMetadata.timestamp = now + } } case class OffsetCommitRequest(groupId: String, @@ -121,7 +130,8 @@ case class OffsetCommitRequest(groupId: String, t1._2.foreach( t2 => { buffer.putInt(t2._1.partition) buffer.putLong(t2._2.offset) - buffer.putLong(t2._2.timestamp) + if (versionId == 1) + buffer.putLong(t2._2.timestamp) writeShortString(buffer, t2._2.metadata) }) }) @@ -143,7 +153,7 @@ case class OffsetCommitRequest(groupId: String, innerCount + 4 /* partition */ + 8 /* offset */ + - 8 /* timestamp */ + + (if (versionId == 1) 8 else 0 ) /* timestamp */ + shortStringLength(offsetAndMetadata._2.metadata) }) }) diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 4cabffe..db7157d 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -19,7 +19,7 @@ package kafka.common case class OffsetAndMetadata(offset: Long, metadata: String = OffsetAndMetadata.NoMetadata, - timestamp: Long = -1L) { + var timestamp: Long = -1L) { override def toString = "OffsetAndMetadata[%d,%s%s]" .format(offset, if (metadata != null && metadata.length > 0) metadata else "NO_METADATA", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2f00992..4124887 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -154,6 +154,7 @@ class KafkaApis(val requestChannel: RequestChannel, val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest) (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) } else { (request.requestObj.asInstanceOf[ProducerRequest], None) diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cd16ced..4e817a2 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -163,8 +163,8 @@ object SerializationTestUtils { versionId = 0, groupId = "group 1", requestInfo = collection.immutable.Map( - TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), - TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata"), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata) )) } -- 1.8.5.2 (Apple Git-48)