From 086a7ca3ebc4514331b099492bf40c2be209721c Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Mon, 30 Mar 2015 14:45:30 -0700 Subject: [PATCH 1/2] KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted. --- core/src/main/scala/kafka/server/OffsetManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 18680ce..9f57c84 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -122,7 +122,7 @@ class OffsetManager(val config: OffsetManagerConfig, val startMs = SystemTime.milliseconds val staleOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => - offsetAndMetadata.expireTimestamp < startMs + offsetAndMetadata.expireTimestamp < startMs || !metadataCache.contains(groupTopicPartition.topicPartition.topic) } debug("Found %d expired offsets.".format(staleOffsets.size)) -- 2.3.2 (Apple Git-55) From 072bac408f2afa99c854547585f896f60bb97d88 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sun, 3 May 2015 10:37:04 -0700 Subject: [PATCH 2/2] KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted. --- .../scala/unit/kafka/server/OffsetCommitTest.scala | 32 +++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 652208a..2a49f9d 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -20,6 +20,7 @@ package kafka.server import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest} import kafka.consumer.SimpleConsumer import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition} +import kafka.admin.AdminUtils import kafka.utils._ import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness @@ -49,7 +50,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { @Before override def setUp() { super.setUp() - val config: Properties = createBrokerConfig(1, zkConnect) + val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true) config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString) val logDirPath = config.getProperty("log.dir") @@ -287,6 +288,35 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { } @Test + def testOffsetDeleteAfterTopicDeletion() { + // set up topic partition + val topic = "topic" + val topicPartition = TopicAndPartition(topic, 0) + createTopic(zkClient, topic, servers = Seq(server), numPartitions = 1) + + val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0))) + + // v0 version commit request with commit timestamp set to -1 + // should not expire + val commitRequest0 = OffsetCommitRequest( + groupId = group, + requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata", -1L)), + versionId = 0 + ) + assertEquals(ErrorMapping.NoError, simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get) + + // start topic deletion + AdminUtils.deleteTopic(zkClient, topic) + TestUtils.verifyTopicDeletion(zkClient, topic, 1, Seq(server)) + Thread.sleep(retentionCheckInterval * 2) + + // check if offsets deleted + val offsetMetadataAndErrorMap = server.offsetManager.getOffsets(group, Seq(TopicAndPartition(topic, 0))) + val offsetMetadataAndError = offsetMetadataAndErrorMap.get(topicPartition).get + assertEquals(OffsetMetadataAndError.NoOffset, offsetMetadataAndError) + } + + @Test def testNonExistingTopicOffsetCommit() { val topic1 = "topicDoesNotExists" val topic2 = "topic-2" -- 2.3.2 (Apple Git-55)