From a6296c0517e2683551ede44f23b484cedf57f7b7 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 14 Jan 2015 17:50:10 -0800 Subject: [PATCH 1/5] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 12 ++++++------ core/src/main/scala/kafka/server/OffsetManager.scala | 20 +++++++++++++++----- .../scala/unit/kafka/server/OffsetCommitTest.scala | 19 +++++++++++++++++++ 3 files changed, 40 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 703886a..6911298 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -156,6 +156,7 @@ class KafkaApis(val requestChannel: RequestChannel, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, offsetCommitRequest.requestInfo, + metadataCache, sendResponseCallback) } @@ -273,7 +274,7 @@ class KafkaApis(val requestChannel: RequestChannel, val hw = localReplica.highWatermark.messageOffset if (allOffsets.exists(_ > hw)) hw +: allOffsets.dropWhile(_ > hw) - else + else allOffsets } } @@ -297,19 +298,19 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetResponse(offsetRequest.correlationId, responseMap) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - + def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { logManager.getLog(topicAndPartition) match { - case Some(log) => + case Some(log) => fetchOffsetsBefore(log, timestamp, maxNumOffsets) - case None => + case None => if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime) Seq(0L) else Nil } } - + private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { val segsArray = log.logSegments.toArray var offsetTimeArray: Array[(Long, Long)] = null @@ -496,4 +497,3 @@ class KafkaApis(val requestChannel: RequestChannel, debug("Shut down complete.") } } - diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 83d5264..a2b6fb9 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -212,12 +212,20 @@ class OffsetManager(val config: OffsetManagerConfig, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { - - // first filter out partitions with offset metadata size exceeding limit + // check if there are any non-existent topics + val topics = offsetMetadata.map { case (topicAndPartition, offsetMetadata) => + topicAndPartition.topic + }.toSet + val topicResponses = metadataCache.getTopicMetadata(topics) + val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + + // first filter out partitions with offset metadata size exceeding limit or + // if its a non existing topic // TODO: in the future we may want to only support atomic commit and hence fail the whole commit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) + validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition.topic) } // construct the message set to append @@ -241,7 +249,7 @@ class OffsetManager(val config: OffsetManagerConfig, .format(responseStatus, offsetTopicPartition)) // construct the commit response status and insert - // the offset and metadata to cache iff the append status has no error + // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) val responseCode = @@ -266,7 +274,9 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + if (nonExistentTopics.contains(topicAndPartition.topic)) + (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) + else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) else (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index a2bb885..a37a74d 100644 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -206,4 +206,23 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get) } + + @Test + def testNonExistingTopicOffsetCommit() { + val topic1 = "topicDoesNotExists" + val topic2 = "topic-2" + + createTopic(zkClient, topic2, servers = Seq(server), numPartitions = 1) + + // Commit an offset + val expectedReplicaAssignment = Map(0 -> List(1)) + val commitRequest = OffsetCommitRequest(group, immutable.Map( + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L), + TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=42L) + )) + val commitResponse = simpleConsumer.commitOffsets(commitRequest) + + assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) + assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get) + } } -- 1.9.3 (Apple Git-50) From d49f3d93e50393843c5c2a4bdf466dff1811f6e5 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 12 Feb 2015 15:17:08 -0800 Subject: [PATCH 2/5] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. Added contains method to MetadataCache. --- core/src/main/scala/kafka/server/MetadataCache.scala | 5 ++++- core/src/main/scala/kafka/server/OffsetManager.scala | 12 +++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 4c70aa7..48e7e2a 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -136,6 +136,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } + def contains(topic: String): Boolean = { + cache.contains(topic) + } + private def removePartitionInfo(topic: String, partitionId: Int) = { cache.get(topic) match { case Some(infos) => { @@ -149,4 +153,3 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } } - diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index a2b6fb9..fe68267 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -215,17 +215,15 @@ class OffsetManager(val config: OffsetManagerConfig, metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { // check if there are any non-existent topics - val topics = offsetMetadata.map { case (topicAndPartition, offsetMetadata) => - topicAndPartition.topic - }.toSet - val topicResponses = metadataCache.getTopicMetadata(topics) - val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet + val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) => + !metadataCache.contains(topicAndPartition.topic) + } // first filter out partitions with offset metadata size exceeding limit or // if its a non existing topic // TODO: in the future we may want to only support atomic commit and hence fail the whole commit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition.topic) + validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition) } // construct the message set to append @@ -274,7 +272,7 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (nonExistentTopics.contains(topicAndPartition.topic)) + if (nonExistentTopics.contains(topicAndPartition)) (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) -- 1.9.3 (Apple Git-50) From cd074a140602ea05669ac6314695c8140034f0c5 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 16 Feb 2015 13:12:15 -0800 Subject: [PATCH 3/5] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 10 ++++------ core/src/main/scala/kafka/server/OffsetManager.scala | 4 +++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6911298..13a6aff 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -49,7 +49,7 @@ class KafkaApis(val requestChannel: RequestChannel, this.logIdent = "[KafkaApi-%d] ".format(brokerId) val metadataCache = new MetadataCache(brokerId) - + offsetManager.setMetadataCache(metadataCache) /** * Top-level method that handles all requests and multiplexes to the right api */ @@ -149,14 +149,12 @@ class KafkaApis(val requestChannel: RequestChannel, val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } - // call offset manager to store offsets offsetManager.storeOffsets( offsetCommitRequest.groupId, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, offsetCommitRequest.requestInfo, - metadataCache, sendResponseCallback) } @@ -455,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel, import JavaConversions._ val joinGroupRequest = request.requestObj.asInstanceOf[JoinGroupRequestAndHeader] - + // the callback for sending a join-group response def sendResponseCallback(partitions: List[TopicAndPartition], generationId: Int, errorCode: Short) { val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer @@ -473,7 +471,7 @@ class KafkaApis(val requestChannel: RequestChannel, joinGroupRequest.body.strategy(), sendResponseCallback) } - + def handleHeartbeatRequest(request: RequestChannel.Request) { val heartbeatRequest = request.requestObj.asInstanceOf[HeartbeatRequestAndHeader] @@ -490,7 +488,7 @@ class KafkaApis(val requestChannel: RequestChannel, heartbeatRequest.body.groupGenerationId(), sendResponseCallback) } - + def close() { // TODO currently closing the API is an no-op since the API no longer maintain any modules // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index fe68267..30f6989 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -95,6 +95,7 @@ class OffsetManager(val config: OffsetManagerConfig, private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val shuttingDown = new AtomicBoolean(false) + private var metadataCache: MetadataCache = null this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -164,6 +165,8 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) } + def setMetadataCache(metadataCache: MetadataCache) { this.metadataCache = metadataCache } + def offsetsTopicConfig: Properties = { val props = new Properties props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) @@ -212,7 +215,6 @@ class OffsetManager(val config: OffsetManagerConfig, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], - metadataCache: MetadataCache, responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { // check if there are any non-existent topics val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) => -- 1.9.3 (Apple Git-50) From 4727d91be2d0b2d0def705541f08d437ca8c944a Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Wed, 18 Feb 2015 13:12:28 -0800 Subject: [PATCH 4/5] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaApis.scala | 6 +++--- core/src/main/scala/kafka/server/KafkaServer.scala | 9 +++++++-- core/src/main/scala/kafka/server/OffsetManager.scala | 5 ++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 13a6aff..35af98f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -45,11 +45,11 @@ class KafkaApis(val requestChannel: RequestChannel, val controller: KafkaController, val zkClient: ZkClient, val brokerId: Int, - val config: KafkaConfig) extends Logging { + val config: KafkaConfig, + val metadataCache: MetadataCache) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) - val metadataCache = new MetadataCache(brokerId) - offsetManager.setMetadataCache(metadataCache) + /** * Top-level method that handles all requests and multiplexes to the right api */ diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 426e522..0a9a811 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -68,6 +68,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var kafkaHealthcheck: KafkaHealthcheck = null + var metadataCache: MetadataCache = null var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) @@ -130,6 +131,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() + /* start metadata cache */ + metadataCache = new MetadataCache(config.brokerId) + /* start offset manager */ offsetManager = createOffsetManager() @@ -142,7 +146,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg consumerCoordinator.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config) + apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, + kafkaController, zkClient, config.brokerId, config, metadataCache) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -402,7 +407,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler) + new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache) } /** diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 30f6989..8d41520 100644 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -86,7 +86,8 @@ object OffsetManagerConfig { class OffsetManager(val config: OffsetManagerConfig, replicaManager: ReplicaManager, zkClient: ZkClient, - scheduler: Scheduler) extends Logging with KafkaMetricsGroup { + scheduler: Scheduler, + metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup { /* offsets and metadata cache */ private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] @@ -95,7 +96,6 @@ class OffsetManager(val config: OffsetManagerConfig, private val loadingPartitions: mutable.Set[Int] = mutable.Set() private val shuttingDown = new AtomicBoolean(false) - private var metadataCache: MetadataCache = null this.logIdent = "[Offset Manager on Broker " + replicaManager.config.brokerId + "]: " @@ -165,7 +165,6 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs)) } - def setMetadataCache(metadataCache: MetadataCache) { this.metadataCache = metadataCache } def offsetsTopicConfig: Properties = { val props = new Properties -- 1.9.3 (Apple Git-50) From eb28c35fde5243f732f914d88469d2aae6928efc Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Fri, 27 Feb 2015 13:49:46 -0800 Subject: [PATCH 5/5] KAFKA-1852. OffsetCommitRequest can commit offset on unknown topic. --- core/src/main/scala/kafka/server/KafkaServer.scala | 7 +++---- core/src/main/scala/kafka/server/MetadataCache.scala | 4 +++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 0a9a811..8e3def9 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -68,7 +68,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val kafkaScheduler = new KafkaScheduler(config.backgroundThreads) var kafkaHealthcheck: KafkaHealthcheck = null - var metadataCache: MetadataCache = null + val metadataCache: MetadataCache = new MetadataCache(config.brokerId) + + var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) @@ -131,9 +133,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() - /* start metadata cache */ - metadataCache = new MetadataCache(config.brokerId) - /* start offset manager */ offsetManager = createOffsetManager() diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 48e7e2a..6aef6e4 100644 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -137,7 +137,9 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } def contains(topic: String): Boolean = { - cache.contains(topic) + inReadLock(partitionMetadataLock) { + cache.contains(topic) + } } private def removePartitionInfo(topic: String, partitionId: Int) = { -- 1.9.3 (Apple Git-50)