From 99a4440f02df41335deb5023e1462692a26b7676 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 14 Jan 2015 17:50:10 -0800 Subject: [PATCH] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 12 ++++++------ .../src/main/scala/kafka/server/OffsetManager.scala | 20 +++++++++++++++----- .../scala/unit/kafka/server/OffsetCommitTest.scala | 21 ++++++++++++++++++++- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ec8d9f7..5a03df4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -148,6 +148,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, offsetCommitRequest.requestInfo, + metadataCache, sendResponseCallback) } @@ -265,7 +266,7 @@ class KafkaApis(val requestChannel: RequestChannel, val hw = localReplica.highWatermark.messageOffset if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else + else allOffsets } } @@ -289,19 +290,19 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { logManager.getLog(topicAndPartition) match { - case Some(log) => + case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) - case None => + case None => if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) Seq(0L) else Nil } } - + private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -448,4 +449,3 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Shut down complete.") } } - diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 0bdd42f..0dcc6f6 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -210,12 +210,20 @@ class OffsetManager(val config: OffsetManagerConfig, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { - - // first filter out partitions with offset metadata size exceeding limit + // check if there are any non-existent topics + val topics = offsetMetadata.map { case (topicAndPartition, offsetMetadata) => + topicAndPartition.topic + }.toSet + val topicResponses = metadataCache.getTopicMetadata(topics) + val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + + // first filter out partitions with offset metadata size exceeding limit or + // if its a non existing topic // TODO: in the future we may want to only support atomic commit and hence fail the whole commit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) + validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition.topic) } // construct the message set to append @@ -239,7 +247,7 @@ class OffsetManager(val config: OffsetManagerConfig, .format(responseStatus, offsetTopicPartition)) // construct the commit response status and insert - // the offset and metadata to cache iff the append status has no error + // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) val responseCode = @@ -264,7 +272,9 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + if (nonExistentTopics.contains(topicAndPartition.topic)) + (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) + else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) else (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 5b93239..06a2edf 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -213,4 +213,23 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) } + + @Test + def testNonExistingTopicOffsetCommit() { + val topic1 = "topicDoesNotExists" + val topic2 = "topic-2" + + createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1) + + // Commit an offset + val expectedReplicaAssignment = Map(0 -> List(1)) + val commitRequest = OffsetCommitRequest(group, immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L), + TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L) + )) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) + } } -- 1.9.3 (Apple Git-50)