From 47d06911e32445628510f7ff8cc21610cc73d198 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 14 Oct 2014 14:57:49 -0700 Subject: [PATCH 1/3] KAFKA-1637 Return correct error code and offsets for OffsetFetchRequest for unknown topics/partitions vs no associated consumer. --- .../scala/kafka/common/OffsetMetadataAndError.scala | 1 + core/src/main/scala/kafka/server/KafkaApis.scala | 8 +++++++- .../scala/unit/kafka/server/OffsetCommitTest.scala | 19 ++++++++++++++++--- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 1586243..4cabffe 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -51,5 +51,6 @@ object OffsetMetadataAndError { val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError) val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode) val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode) + val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 67f2833..6ad64d2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -505,7 +505,13 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] - val status = offsetManager.getOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap + // Missing + val (missingTopicPartitions, availableTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => + replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition).isEmpty + ) + val missingStatus = missingTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap + val availableStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, availableTopicPartitions).toMap + val status = missingStatus ++ availableStatus val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 2d93250..8c5364f 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -116,7 +116,13 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val topic1 = "topic-1" val topic2 = "topic-2" val topic3 = "topic-3" - val topic4 = "topic-4" + val topic4 = "topic-4" // Topic that group never consumes + val topic5 = "topic-5" // Non-existent topic + + createTopic(zkClient, topic1, servers = Seq(server), numPartitions = 1) + createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 2) + createTopic(zkClient, topic3, servers = Seq(server), numPartitions = 1) + createTopic(zkClient, topic4, servers = Seq(server), numPartitions = 1) val commitRequest = OffsetCommitRequest("test-group", immutable.Map( TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"), @@ -136,7 +142,8 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { TopicAndPartition(topic3, 0), TopicAndPartition(topic2, 1), TopicAndPartition(topic3, 1), // An unknown partition - TopicAndPartition(topic4, 0) // An unknown topic + TopicAndPartition(topic4, 0), // An unused topic + TopicAndPartition(topic5, 0) // An unknown topic )) val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest) @@ -144,8 +151,12 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error) - assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error) + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) + assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get) + assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get) assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata) assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata) @@ -153,6 +164,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata) assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata) assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata) + assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.metadata) assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset) assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset) @@ -160,6 +172,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset) assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset) assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset) + assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.offset) } @Test -- 2.1.2 From 1e596072216f761886ca467a73a92a0bc21e7c71 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Tue, 14 Oct 2014 23:36:22 -0700 Subject: [PATCH 2/3] Use MetadataCache instead of ReplicaManager to check for non-existent topics and partitions. --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6ad64d2..fcffe7b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -507,7 +507,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Missing val (missingTopicPartitions, availableTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => - replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition).isEmpty + metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty ) val missingStatus = missingTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap val availableStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, availableTopicPartitions).toMap -- 2.1.2 From 310c39eca4c35cfd430b014fe5ddb290c5a1a1ca Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 15 Oct 2014 14:35:21 -0700 Subject: [PATCH 3/3] Updating naming --- core/src/main/scala/kafka/server/KafkaApis.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index fcffe7b..85498b4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -505,13 +505,12 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] - // Missing - val (missingTopicPartitions, availableTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => + val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty ) - val missingStatus = missingTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val availableStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, availableTopicPartitions).toMap - val status = missingStatus ++ availableStatus + val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap + val knownStatus = offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + val status = unknownStatus ++ knownStatus val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) -- 2.1.2