From d256a79e64eca6b8dbc3f83e050d94bc6d4a58ad Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 14 Jan 2015 17:50:10 -0800 Subject: [PATCH 1/2] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 12 ++++++------ .../src/main/scala/kafka/server/OffsetManager.scala | 20 +++++++++++++++----- .../scala/unit/kafka/server/OffsetCommitTest.scala | 21 ++++++++++++++++++++- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6ee7d88..252850e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -156,6 +156,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, offsetCommitRequest.requestInfo, + metadataCache, sendResponseCallback) } @@ -273,7 +274,7 @@ class KafkaApis(val requestChannel: RequestChannel, val hw = localReplica.highWatermark.messageOffset if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else + else allOffsets } } @@ -297,19 +298,19 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { logManager.getLog(topicAndPartition) match { - case Some(log) => + case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) - case None => + case None => if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) Seq(0L) else Nil } } - + private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -496,4 +497,3 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Shut down complete.") } } - diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 83d5264..a2b6fb9 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -212,12 +212,20 @@ class OffsetManager(val config: OffsetManagerConfig, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { - - // first filter out partitions with offset metadata size exceeding limit + // check if there are any non-existent topics + val topics = offsetMetadata.map { case (topicAndPartition, offsetMetadata) => + topicAndPartition.topic + }.toSet + val topicResponses = metadataCache.getTopicMetadata(topics) + val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + + // first filter out partitions with offset metadata size exceeding limit or + // if its a non existing topic // TODO: in the future we may want to only support atomic commit and hence fail the whole commit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) + validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition.topic) } // construct the message set to append @@ -241,7 +249,7 @@ class OffsetManager(val config: OffsetManagerConfig, .format(responseStatus, offsetTopicPartition)) // construct the commit response status and insert - // the offset and metadata to cache iff the append status has no error + // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) val responseCode = @@ -266,7 +274,9 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + if (nonExistentTopics.contains(topicAndPartition.topic)) + (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) + else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) else (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 5b93239..06a2edf 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -213,4 +213,23 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) } + + @Test + def testNonExistingTopicOffsetCommit() { + val topic1 = "topicDoesNotExists" + val topic2 = "topic-2" + + createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1) + + // Commit an offset + val expectedReplicaAssignment = Map(0 -> List(1)) + val commitRequest = OffsetCommitRequest(group, immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L), + TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L) + )) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) + } } -- 1.9.3 (Apple Git-50) From 90a432978a534b0c297025c54f8ff483455e3fec Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 12 Feb 2015 15:17:08 -0800 Subject: [PATCH 2/2] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. Added contains method to MetadataCache. --- core/src/main/scala/kafka/server/MetadataCache.scala | 5 ++++- core/src/main/scala/kafka/server/OffsetManager.scala | 12 +++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 4c70aa7..48e7e2a 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -136,6 +136,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } + def contains(topic: String): Boolean = { + cache.contains(topic) + } + private def removePartitionInfo(topic: String, partitionId: Int) = { cache.get(topic) match { case Some(infos) => { @@ -149,4 +153,3 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } } - diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index a2b6fb9..fe68267 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -215,17 +215,15 @@ class OffsetManager(val config: OffsetManagerConfig, metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { // check if there are any non-existent topics - val topics = offsetMetadata.map { case (topicAndPartition, offsetMetadata) => - topicAndPartition.topic - }.toSet - val topicResponses = metadataCache.getTopicMetadata(topics) - val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) => + !metadataCache.contains(topicAndPartition.topic) + } // first filter out partitions with offset metadata size exceeding limit or // if its a non existing topic // TODO: in the future we may want to only support atomic commit and hence fail the whole commit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition.topic) + validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition) } // construct the message set to append @@ -274,7 +272,7 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (nonExistentTopics.contains(topicAndPartition.topic)) + if (nonExistentTopics.contains(topicAndPartition)) (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) -- 1.9.3 (Apple Git-50)