From c3129acf8fd7cf8963484928ca315762375a0030 Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Fri, 12 Dec 2014 14:47:42 -0800 Subject: [PATCH] add command to delete all consumer group information for a topic in zookeeper --- core/src/main/scala/kafka/admin/AdminUtils.scala | 25 +++- core/src/main/scala/kafka/admin/TopicCommand.scala | 24 +++- core/src/main/scala/kafka/utils/ZkUtils.scala | 12 ++ ...eleteAllConsumerGroupInfoForTopicInZKTest.scala | 142 +++++++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 9 +- 5 files changed, 204 insertions(+), 8 deletions(-) create mode 100644 core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 28b12c7..6b68ae3 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -20,7 +20,7 @@ package kafka.admin import kafka.common._ import kafka.cluster.Broker import kafka.log.LogConfig -import kafka.utils.{Logging, ZkUtils, Json} +import kafka.utils.{Logging, ZkUtils, Json, ZKGroupTopicDirs} import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random @@ -164,6 +164,29 @@ object AdminUtils extends Logging { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) } + /** + * Delete every consumer group's information about the given topic in Zookeeper. + * Only deletes consumer group information if the topic does not exist. + * + * @param zkClient Zookeeper client + * @param topic Topic of the consumer group information we wish to delete + * @return whether or not we deleted the consumer group information + */ + def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) = { + if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { + false + } + else { + val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic) + groups.foreach { group => + val dirs = new ZKGroupTopicDirs(group, topic) + ZkUtils.deletePathRecursive(zkClient, dirs.consumerOwnerDir) + ZkUtils.deletePathRecursive(zkClient, dirs.consumerOffsetDir) + } + true + } + } + def topicExists(zkClient: ZkClient, topic: String): Boolean = zkClient.exists(ZkUtils.getTopicPath(topic)) diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 285c033..15b5627 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,7 +19,7 @@ package kafka.admin import joptsimple._ import java.util.Properties -import kafka.common.AdminCommandFailedException +import kafka.common.{Topic, AdminCommandFailedException} import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -39,12 +39,12 @@ object TopicCommand { val opts = new TopicCommandOptions(args) if(args.length == 0) - CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.") + CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, delete all consumer group info, describe, or change a topic.") // should have exactly one action - val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) + val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt, opts.deleteAllConsumerGroupInfoForTopicOpt).count(opts.options.has _) if(actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete") + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter, --delete, or --delete-all-consumer-group-info-for-topic") opts.checkArgs() @@ -61,6 +61,8 @@ object TopicCommand { describeTopic(zkClient, opts) else if(opts.options.has(opts.deleteOpt)) deleteTopic(zkClient, opts) + else if(opts.options.has(opts.deleteAllConsumerGroupInfoForTopicOpt)) + deleteAllConsumerGroupInfoForTopic(zkClient, opts) } catch { case e: Throwable => println("Error while executing topic command " + e.getMessage) @@ -155,6 +157,17 @@ object TopicCommand { } } + def deleteAllConsumerGroupInfoForTopic(zkClient: ZkClient, opts: TopicCommandOptions) { + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + if (AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)) { + println("Deleted all consumer group information for topic %s in zookeeper.".format(topic)) + } + else { + println("--delete-all-consumer-group-info-for-topic failed because topic %s still exists.".format(topic)) + } + } + def describeTopic(zkClient: ZkClient, opts: TopicCommandOptions) { val topics = getTopics(zkClient, opts) val reportUnderReplicatedPartitions = if (opts.options.has(opts.reportUnderReplicatedPartitionsOpt)) true else false @@ -246,6 +259,9 @@ object TopicCommand { val listOpt = parser.accepts("list", "List all available topics.") val createOpt = parser.accepts("create", "Create a new topic.") val deleteOpt = parser.accepts("delete", "Delete a topic") + val deleteAllConsumerGroupInfoForTopicOpt = parser.accepts("delete-all-consumer-group-info-for-topic", + "Delete topic partition offsets and ownership information for every consumer group. " + + "WARNING: Assumes there are no active producers and consumers on the topic.") val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.") val describeOpt = parser.accepts("describe", "List details for the given topics.") val helpOpt = parser.accepts("help", "Print usage information.") diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 56e3e88..cbb27df 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -713,6 +713,18 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = { + val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath) + if (groups == null) Set.empty + else { + groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) => + val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupDir + "/offsets") + if (topics.contains(topic)) consumerGroupsForTopic + group + else consumerGroupsForTopic + } + } + } } object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala new file mode 100644 index 0000000..759d8b5 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ +import kafka.utils.{ZKGroupTopicDirs, ZkUtils, TestUtils} +import kafka.server.{KafkaServer, KafkaConfig} +import org.junit.Test +import kafka.consumer._ +import kafka.common.TopicAndPartition +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} +import kafka.integration.KafkaServerTestHarness + + +class DeleteAllConsumerGroupInfoForTopicInZKTest extends JUnit3Suite with KafkaServerTestHarness { + val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_)) + + @Test + def testDoNothingWhenTopicExists() { + val topic = "test" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val groupTopicDirs = groups.map(group => new ZKGroupTopicDirs(group, topic)) + groupTopicDirs.foreach(dir => fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, "")) + + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic) + + TestUtils.waitUntilTrue(() => groupTopicDirs.forall(groupTopicDirsExist), + "DeleteAllConsumerGroupInfoForTopicInZK should do nothing when topic exists") + } + + @Test + def testDeleteAllConsumerGroupInfoForTopicAfterDeleteTopic() { + val topicToDelete = "topicToDelete" + val otherTopic = "otherTopic" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete)) + val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic)) + groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, "")) + groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, "")) + + AdminUtils.deleteTopic(zkClient, topicToDelete) + verifyTopicDeletion(topicToDelete, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete) + + TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicDirsExist), + "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicDirsExist), + "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + } + + @Test + def testConsumptionOnRecreatedTopicAfterDeleteAllConsumerGroupInfoForTopicInZK() { + val topic = "topic" + val group = "group" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val dir = new ZKGroupTopicDirs(group, topic) + fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, "") + + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic) + + TestUtils.waitUntilTrue(() => !groupTopicDirsExist(dir), + "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + //produce events + val producer = TestUtils.createNewProducer(brokerList) + produceEvents(producer, topic, List.fill(10)("test")) + + //consume events + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, "consumer") + consumerProps.put("auto.commit.enable", "false") + consumerProps.put("auto.offset.reset", "smallest") + consumerProps.put("consumer.timeout.ms", "2000") + consumerProps.put("fetch.wait.max.ms", "0") + val consumerConfig = new ConsumerConfig(consumerProps) + val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head + consumeEvents(messageStream, 5) + consumerConnector.commitOffsets(false) + producer.close() + consumerConnector.shutdown() + + TestUtils.waitUntilTrue(() => groupTopicDirsExist(dir), "Consumer group info should exist after consuming from a recreated topic") + } + + private def fillInConsumerGroupInfo(topic: String, group: String, consumerId: String, partition: Int, offset: Int, data: String) { + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, consumerId) + val consumerConfig = new ConsumerConfig(consumerProps) + val dir = new ZKGroupTopicDirs(group, topic) + TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset) + ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), data) + } + + private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) { + val topicAndPartition = TopicAndPartition(topic, 0) + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted") + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), + "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted") + // ensure that the topic-partition has been deleted from all brokers' replica managers + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None), + "Replica manager's should have deleted all of this topic's partitions") + // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper + assertTrue("Replica logs not deleted after delete topic is complete", + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) + } + + private def groupTopicDirsExist(dir: ZKGroupTopicDirs) = { + ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir) + } + + private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) { + messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes))) + } + + private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) { + val iter = messageStream.iterator + (0 until n).foreach(_ => iter.next) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 94d0028..8c40201 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -139,9 +139,10 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfigs(numConfigs: Int, - enableControlledShutdown: Boolean = true): List[Properties] = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): List[Properties] = { for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port, enableControlledShutdown) + yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic) } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { @@ -152,7 +153,8 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfig(nodeId: Int, port: Int = choosePort(), - enableControlledShutdown: Boolean = true): Properties = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): Properties = { val props = new Properties props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") @@ -161,6 +163,7 @@ object TestUtils extends Logging { props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) + props.put("delete.topic.enable", enableDeleteTopic.toString) props } -- 1.7.12.4