From 805f4594fefcf1514b1f6650ca50e7bfcc8dc3a6 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sat, 25 Apr 2015 00:55:08 -0700 Subject: [PATCH] K2068.P1 --- .../org/apache/kafka/common/protocol/Protocol.java | 35 ++++- .../kafka/common/requests/OffsetCommitRequest.java | 7 +- .../main/scala/kafka/api/OffsetCommitRequest.scala | 9 +- .../main/scala/kafka/api/OffsetFetchRequest.scala | 2 +- .../kafka/common/OffsetMetadataAndError.scala | 4 +- core/src/main/scala/kafka/server/KafkaApis.scala | 145 +++++++++++++------- .../api/RequestResponseSerializationTest.scala | 4 +- .../scala/unit/kafka/server/OffsetCommitTest.scala | 15 +- 8 files changed, 151 insertions(+), 70 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index d53fe45..34e934b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -118,6 +118,16 @@ public class Protocol { new Field("offset", INT64, "Message offset to be committed."), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep.")); + + public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Message offset to be committed."), new Field("timestamp", INT64, "Timestamp of the commit"), @@ -125,7 +135,7 @@ public class Protocol { STRING, "Any associated metadata the client wants to keep.")); - public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition", INT32, "Topic partition id."), new Field("offset", @@ -149,6 +159,13 @@ public class Protocol { new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1), "Partitions to commit offsets.")); + public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic", + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2), + "Partitions to commit offsets.")); + public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."), @@ -166,7 +183,7 @@ public class Protocol { STRING, "The consumer id assigned by the group coordinator."), new Field("topics", - new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets.")); public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id", @@ -182,7 +199,7 @@ public class Protocol { INT64, "Time period in ms to retain the offset."), new Field("topics", - new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets.")); public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", @@ -203,6 +220,9 @@ public class Protocol { public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0}; /* Offset fetch api */ + + /* version 0 and 1 have exactly the same wire format, but different functionality: */ + /* version 0 will read the offsets from ZK; */ public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id.")); @@ -221,6 +241,9 @@ public class Protocol { new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets.")); + /* version 1 will read the offsets from Kafka. */ + public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0; + public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -239,8 +262,10 @@ public class Protocol { public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0))); - public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0}; - public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0}; + public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1}; + + /* The response types for both V0 and V1 of OFFSET_FETCH_RESPONSE are the same. */ + public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V0}; /* List offset api */ public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition", diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index a0e1976..8bf6cbb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -68,7 +68,7 @@ public class OffsetCommitRequest extends AbstractRequest { public static final class PartitionData { @Deprecated - public final long timestamp; // for V0, V1 + public final long timestamp; // for V1 public final long offset; public final String metadata; @@ -93,6 +93,7 @@ public class OffsetCommitRequest extends AbstractRequest { @Deprecated public OffsetCommitRequest(String groupId, Map offsetData) { super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0))); + initCommonFields(groupId, offsetData); this.groupId = groupId; this.generationId = DEFAULT_GENERATION_ID; @@ -159,7 +160,7 @@ public class OffsetCommitRequest extends AbstractRequest { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); - // Only for v0 and v1 + // Only for v1 if (partitionData.hasField(TIMESTAMP_KEY_NAME)) partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); @@ -203,7 +204,7 @@ public class OffsetCommitRequest extends AbstractRequest { long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME); String metadata = partitionDataStruct.getString(METADATA_KEY_NAME); PartitionData partitionOffset; - // This field only exists in v0 and v1 + // This field only exists in v1 if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) { long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME); partitionOffset = new PartitionData(offset, timestamp, metadata); diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index cf8e6ac..317daed 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -69,7 +69,8 @@ object OffsetCommitRequest extends Logging { val partitionId = buffer.getInt val offset = buffer.getLong val timestamp = { - if (versionId <= 1) + // version 1 specific field + if (versionId == 1) buffer.getLong else org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP @@ -126,8 +127,8 @@ case class OffsetCommitRequest(groupId: String, t1._2.foreach( t2 => { buffer.putInt(t2._1.partition) buffer.putLong(t2._2.offset) - // version 0 and 1 specific data - if (versionId <= 1) + // version 1 specific data + if (versionId == 1) buffer.putLong(t2._2.commitTimestamp) writeShortString(buffer, t2._2.metadata) }) @@ -151,7 +152,7 @@ case class OffsetCommitRequest(groupId: String, innerCount + 4 /* partition */ + 8 /* offset */ + - (if (versionId <= 1) 8 else 0) /* timestamp */ + + (if (versionId == 1) 8 else 0) /* timestamp */ + shortStringLength(offsetAndMetadata._2.metadata) }) }) diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index 67811a7..fa8bd6a 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -29,7 +29,7 @@ import scala.Some import java.nio.ByteBuffer object OffsetFetchRequest extends Logging { - val CurrentVersion: Short = 0 + val CurrentVersion: Short = 1 val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetFetchRequest = { diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 139913f..6b4242c 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -34,9 +34,9 @@ case class OffsetAndMetadata(offsetMetadata: OffsetMetadata, commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP, expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) { - def offset() = offsetMetadata.offset + def offset = offsetMetadata.offset - def metadata() = offsetMetadata.metadata + def metadata = offsetMetadata.metadata override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp, expireTimestamp) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index b4004aa..417960d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -28,7 +28,7 @@ import kafka.coordinator.ConsumerCoordinator import kafka.log._ import kafka.network._ import kafka.network.RequestChannel.Response -import kafka.utils.{SystemTime, Logging} +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging} import scala.collection._ @@ -161,44 +161,70 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - // compute the retention time based on the request version: - // if it is before v2 or not specified by user, we can use the default retention - val offsetRetention = - if (offsetCommitRequest.versionId <= 1 || - offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) { - offsetManager.config.offsetsRetentionMs - } else { - offsetCommitRequest.retentionMs + if (offsetCommitRequest.versionId == 0) { + // for version 0 always store offsets to ZK + val responseInfo = offsetCommitRequest.requestInfo.map { + case (topicAndPartition, metaAndError) => { + val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) + try { + if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) { + (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) + } else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) { + (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) + } else { + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + + topicAndPartition.partition, metaAndError.offset.toString) + (topicAndPartition, ErrorMapping.NoError) + } + } catch { + case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + } } - // commit timestamp is always set to now. - // "default" expiration timestamp is now + retention (and retention may be overridden if v2) - // expire timestamp is computed differently for v1 and v2. - // - If v1 and no explicit commit timestamp is provided we use default expiration timestamp. - // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp - // - If v2 we use the default expiration timestamp - val currentTimestamp = SystemTime.milliseconds - val defaultExpireTimestamp = offsetRetention + currentTimestamp - - val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata => - offsetAndMetadata.copy( - commitTimestamp = currentTimestamp, - expireTimestamp = { - if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) - defaultExpireTimestamp - else - offsetRetention + offsetAndMetadata.commitTimestamp + sendResponseCallback(responseInfo) + } else { + // for version 1 and beyond store offsets in offset manager + + // compute the retention time based on the request version: + // if it is v1 or not specified by user, we can use the default retention + val offsetRetention = + if (offsetCommitRequest.versionId <= 1 || + offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) { + offsetManager.config.offsetsRetentionMs + } else { + offsetCommitRequest.retentionMs } + + // commit timestamp is always set to now. + // "default" expiration timestamp is now + retention (and retention may be overridden if v2) + // expire timestamp is computed differently for v1 and v2. + // - If v1 and no explicit commit timestamp is provided we use default expiration timestamp. + // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp + // - If v2 we use the default expiration timestamp + val currentTimestamp = SystemTime.milliseconds + val defaultExpireTimestamp = offsetRetention + currentTimestamp + + val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata => + offsetAndMetadata.copy( + commitTimestamp = currentTimestamp, + expireTimestamp = { + if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + defaultExpireTimestamp + else + offsetRetention + offsetAndMetadata.commitTimestamp + } + ) ) - ) - - // call offset manager to store offsets - offsetManager.storeOffsets( - offsetCommitRequest.groupId, - offsetCommitRequest.consumerId, - offsetCommitRequest.groupGenerationId, - offsetData, - sendResponseCallback) + + // call offset manager to store offsets + offsetManager.storeOffsets( + offsetCommitRequest.groupId, + offsetCommitRequest.consumerId, + offsetCommitRequest.groupGenerationId, + offsetData, + sendResponseCallback) + } } /** @@ -449,21 +475,46 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] - val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => - metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty - ) - val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val knownStatus = - if (knownTopicPartitions.size > 0) - offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap - else - Map.empty[TopicAndPartition, OffsetMetadataAndError] - val status = unknownStatus ++ knownStatus + val response = if (offsetFetchRequest.versionId == 0) { + // version 0 reads offsets from ZK + val responseInfo = offsetFetchRequest.requestInfo.map( topicAndPartition => { + val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic) + try { + if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) { + (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition) + } else { + val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1 + payloadOpt match { + case Some(payload) => (topicAndPartition, OffsetMetadataAndError(payload.toLong)) + case None => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition) + } + } + } catch { + case e: Throwable => + (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))) + } + }) - val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) + OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId) + } else { + // version 1 reads offsets from Kafka + val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => + metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty + ) + val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap + val knownStatus = + if (knownTopicPartitions.size > 0) + offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + else + Map.empty[TopicAndPartition, OffsetMetadataAndError] + val status = unknownStatus ++ knownStatus + + OffsetFetchResponse(status, offsetFetchRequest.correlationId) + } trace("Sending offset fetch response %s for correlation id %d to client %s." - .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) + .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index dbf9f48..5717165 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -204,8 +204,8 @@ object SerializationTestUtils { versionId = 0, groupId = "group 1", requestInfo = collection.immutable.Map( - TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata", SystemTime.milliseconds), - TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata, SystemTime.milliseconds) + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(42L, "some metadata"), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(100L, OffsetMetadata.NoMetadata) )) } diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 652208a..0fa5938 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -85,7 +85,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { // create the topic createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = Seq(server)) - val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L))) + val commitRequest = OffsetCommitRequest(group, immutable.Map(topicAndPartition -> OffsetAndMetadata(offset = 42L))) val commitResponse = simpleConsumer.commitOffsets(commitRequest) assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get) @@ -227,16 +227,19 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0))) - // v0 version commit request with commit timestamp set to -1 - // should not expire + // v0 version commit request + // should not exist with fetch version 1 since it was stored in ZK val commitRequest0 = OffsetCommitRequest( groupId = group, - requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata", -1L)), + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata")), versionId = 0 ) assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get) - Thread.sleep(retentionCheckInterval * 2) - assertEquals(1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + assertEquals(-1L, simpleConsumer.fetchOffsets(fetchRequest).requestInfo.get(topicPartition).get.offset) + + // should exist with fetch version 0 + assertEquals(1L, simpleConsumer.fetchOffsets(OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)), versionId = 0)).requestInfo.get(topicPartition).get.offset) + // v1 version commit request with commit timestamp set to -1 // should not expire -- 1.7.10.2 (Apple Git-33)