From 72849737ae47f6d1cbaf14073763cd1f8369b7d6 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 14 Jan 2015 17:50:10 -0800 Subject: [PATCH 1/4] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 12 ++++++------ .../src/main/scala/kafka/server/OffsetManager.scala | 20 +++++++++++++++----- .../scala/unit/kafka/server/OffsetCommitTest.scala | 21 ++++++++++++++++++++- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 703886a..6911298 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -156,6 +156,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, offsetCommitRequest.requestInfo, + metadataCache, sendResponseCallback) } @@ -273,7 +274,7 @@ class KafkaApis(val requestChannel: RequestChannel, val hw = localReplica.highWatermark.messageOffset if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else + else allOffsets } } @@ -297,19 +298,19 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { logManager.getLog(topicAndPartition) match { - case Some(log) => + case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) - case None => + case None => if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) Seq(0L) else Nil } } - + private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -496,4 +497,3 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Shut down complete.") } } - diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 83d5264..a2b6fb9 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -212,12 +212,20 @@ class OffsetManager(val config: OffsetManagerConfig, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { - - // first filter out partitions with offset metadata size exceeding limit + // check if there are any non-existent topics + val topics = offsetMetadata.map { case (topicAndPartition, offsetMetadata) => + topicAndPartition.topic + }.toSet + val topicResponses = metadataCache.getTopicMetadata(topics) + val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + + // first filter out partitions with offset metadata size exceeding limit or + // if its a non existing topic // TODO: in the future we may want to only support atomic commit and hence fail the whole commit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) + validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition.topic) } // construct the message set to append @@ -241,7 +249,7 @@ class OffsetManager(val config: OffsetManagerConfig, .format(responseStatus, offsetTopicPartition)) // construct the commit response status and insert - // the offset and metadata to cache iff the append status has no error + // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) val responseCode = @@ -266,7 +274,9 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + if (nonExistentTopics.contains(topicAndPartition.topic)) + (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) + else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) else (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 5b93239..06a2edf 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -213,4 +213,23 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) } + + @Test + def testNonExistingTopicOffsetCommit() { + val topic1 = "topicDoesNotExists" + val topic2 = "topic-2" + + createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1) + + // Commit an offset + val expectedReplicaAssignment = Map(0 -> List(1)) + val commitRequest = OffsetCommitRequest(group, immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L), + TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L) + )) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) + } } -- 1.9.3 (Apple Git-50) From 886d2d6cd24566a2976526067867317f4eb4ea86 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 12 Feb 2015 15:17:08 -0800 Subject: [PATCH 2/4] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. Added contains method to MetadataCache. --- core/src/main/scala/kafka/server/MetadataCache.scala | 5 ++++- core/src/main/scala/kafka/server/OffsetManager.scala | 12 +++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 4c70aa7..48e7e2a 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -136,6 +136,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } + def contains(topic: String): Boolean = { + cache.contains(topic) + } + private def removePartitionInfo(topic: String, partitionId: Int) = { cache.get(topic) match { case Some(infos) => { @@ -149,4 +153,3 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } } - diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index a2b6fb9..fe68267 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -215,17 +215,15 @@ class OffsetManager(val config: OffsetManagerConfig, metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { // check if there are any non-existent topics - val topics = offsetMetadata.map { case (topicAndPartition, offsetMetadata) => - topicAndPartition.topic - }.toSet - val topicResponses = metadataCache.getTopicMetadata(topics) - val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) => + !metadataCache.contains(topicAndPartition.topic) + } // first filter out partitions with offset metadata size exceeding limit or // if its a non existing topic // TODO: in the future we may want to only support atomic commit and hence fail the whole commit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition.topic) + validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition) } // construct the message set to append @@ -274,7 +272,7 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (nonExistentTopics.contains(topicAndPartition.topic)) + if (nonExistentTopics.contains(topicAndPartition)) (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) -- 1.9.3 (Apple Git-50) From 44d6f8463ad56b77dc317970cf26d194056236fd Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 16 Feb 2015 13:12:15 -0800 Subject: [PATCH 3/4] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 10 ++++------ core/src/main/scala/kafka/server/OffsetManager.scala | 4 +++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6911298..13a6aff 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -49,7 +49,7 @@ class KafkaApis(val requestChannel: RequestChannel, this.logIdent = "[KafkaApi-%d] ".format(brokerId) val metadataCache = new MetadataCache(brokerId) - + offsetManager.setMetadataCache(metadataCache) /** * Top-level method that handles all requests and multiplexes to the right api */ @@ -149,14 +149,12 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - // call offset manager to store offsets offsetManager.storeOffsets( offsetCommitRequest.groupId, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, offsetCommitRequest.requestInfo, - metadataCache, sendResponseCallback) } @@ -455,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel, import JavaConversions._ val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] - + // the callback for sending a join-group response def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer @@ -473,7 +471,7 @@ class KafkaApis(val requestChannel: RequestChannel, joinGroupRequest.body.strategy(), sendResponseCallback) } - + def handleHeartbeatRequest(request: RequestChannel.Request) { val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] @@ -490,7 +488,7 @@ class KafkaApis(val requestChannel: RequestChannel, heartbeatRequest.body.groupGenerationId(), sendResponseCallback) } - + def close() { // TODO currently closing the API is an no-op since the API no longer maintain any modules // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index fe68267..30f6989 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -95,6 +95,7 @@ class OffsetManager(val config: OffsetManagerConfig, private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val shuttingDown = new AtomicBoolean(false) + private var metadataCache: MetadataCache = null this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -164,6 +165,8 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) } + def setMetadataCache(metadataCache: MetadataCache) { this.metadataCache = metadataCache } + def offsetsTopicConfig: Properties = { val props = new Properties props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) @@ -212,7 +215,6 @@ class OffsetManager(val config: OffsetManagerConfig, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], - metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { // check if there are any non-existent topics val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) => -- 1.9.3 (Apple Git-50) From 135dab0f6b05b637fe29901986884d38e42d2d30 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 18 Feb 2015 13:12:28 -0800 Subject: [PATCH 4/4] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++--- core/src/main/scala/kafka/server/KafkaServer.scala | 9 +++++++-- core/src/main/scala/kafka/server/OffsetManager.scala | 5 ++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 13a6aff..35af98f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -45,11 +45,11 @@ class KafkaApis(val requestChannel: RequestChannel, val controller: KafkaController, val zkClient: ZkClient, val brokerId: Int, - val config: KafkaConfig) extends Logging { + val config: KafkaConfig, + val metadataCache: MetadataCache) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) - val metadataCache = new MetadataCache(brokerId) - offsetManager.setMetadataCache(metadataCache) + /** * Top-level method that handles all requests and multiplexes to the right api */ diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7e5ddcb..210067e 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -68,6 +68,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var kafkaHealthcheck: KafkaHealthcheck = null + var metadataCache: MetadataCache = null var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) @@ -130,6 +131,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() + /* start metadata cache */ + metadataCache = new MetadataCache(config.brokerId) + /* start offset manager */ offsetManager = createOffsetManager() @@ -142,7 +146,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg consumerCoordinator.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, + kafkaController, zkClient, config.brokerId, config, metadataCache) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -404,7 +409,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler) + new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache) } /** diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 30f6989..8d41520 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -86,7 +86,8 @@ object OffsetManagerConfig { class OffsetManager(val config: OffsetManagerConfig, replicaManager: ReplicaManager, zkClient: ZkClient, - scheduler: Scheduler) extends Logging with KafkaMetricsGroup { + scheduler: Scheduler, + metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup { /* offsets and metadata cache */ private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] @@ -95,7 +96,6 @@ class OffsetManager(val config: OffsetManagerConfig, private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val shuttingDown = new AtomicBoolean(false) - private var metadataCache: MetadataCache = null this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -165,7 +165,6 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) } - def setMetadataCache(metadataCache: MetadataCache) { this.metadataCache = metadataCache } def offsetsTopicConfig: Properties = { val props = new Properties -- 1.9.3 (Apple Git-50)