From 598976ae0ffff12eb28fad53ae658f2efdcff9f3 Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Fri, 30 Jan 2015 11:05:35 -0800 Subject: [PATCH] Patch for KAFKA-1476 --- bin/kafka-consumer-groups.sh | 17 ++ core/src/main/scala/kafka/admin/AdminUtils.scala | 63 ++++- .../scala/kafka/admin/ConsumerGroupCommand.scala | 292 +++++++++++++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 20 ++ .../unit/kafka/admin/DeleteConsumerGroupTest.scala | 239 +++++++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 9 +- 6 files changed, 636 insertions(+), 4 deletions(-) create mode 100755 bin/kafka-consumer-groups.sh create mode 100644 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala create mode 100644 core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala diff --git a/bin/kafka-consumer-groups.sh b/bin/kafka-consumer-groups.sh new file mode 100755 index 0000000..f4786db --- /dev/null +++ b/bin/kafka-consumer-groups.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand $@ diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 28b12c7..e57e5aa 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -20,7 +20,7 @@ package kafka.admin import kafka.common._ import kafka.cluster.Broker import kafka.log.LogConfig -import kafka.utils.{Logging, ZkUtils, Json} +import kafka.utils._ import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random @@ -164,6 +164,67 @@ object AdminUtils extends Logging { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) } + def isConsumerGroupActive(zkClient: ZkClient, group: String) = { + ZkUtils.getConsumersInGroup(zkClient, group).nonEmpty + } + + /** + * Delete the whole directory of the given consumer group if the group is inactive. + * + * @param zkClient Zookeeper client + * @param group Consumer group + * @return whether or not we deleted the consumer group information + */ + def deleteConsumerGroupInZK(zkClient: ZkClient, group: String) = { + if (!isConsumerGroupActive(zkClient, group)) { + val dir = new ZKGroupDirs(group) + ZkUtils.deletePathRecursive(zkClient, dir.consumerGroupDir) + true + } + else false + } + + /** + * Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive. + * If the consumer group consumes no other topics, delete the whole consumer group directory. + * + * @param zkClient Zookeeper client + * @param group Consumer group + * @param topic Topic of the consumer group information we wish to delete + * @return whether or not we deleted the consumer group information for the given topic + */ + def deleteConsumerGroupInfoForTopicInZK(zkClient: ZkClient, group: String, topic: String) = { + val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group) + if (topics == Seq(topic)) { + deleteConsumerGroupInZK(zkClient, group) + } + else if (!isConsumerGroupActive(zkClient, group)) { + val dir = new ZKGroupTopicDirs(group, topic) + ZkUtils.deletePathRecursive(zkClient, dir.consumerOwnerDir) + ZkUtils.deletePathRecursive(zkClient, dir.consumerOffsetDir) + true + } + else false + } + + /** + * Delete every inactive consumer group's information about the given topic in Zookeeper. + * + * @param zkClient Zookeeper client + * @param topic Topic of the consumer group information we wish to delete + * @return whether or not we deleted the consumer group information + */ + def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String) = { + if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { + false + } + else { + val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic) + groups.foreach(group => deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic)) + true + } + } + def topicExists(zkClient: ZkClient, topic: String): Boolean = zkClient.exists(ZkUtils.getTopicPath(topic)) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala new file mode 100644 index 0000000..141fa8a --- /dev/null +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + + +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import kafka.common._ +import java.util.Properties +import kafka.client.ClientUtils +import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest} +import org.I0Itec.zkclient.exception.ZkNoNodeException +import kafka.common.TopicAndPartition +import joptsimple.{OptionSpec, OptionParser} +import scala.collection.{Set, mutable} +import kafka.consumer.SimpleConsumer +import collection.JavaConversions._ + + +object ConsumerGroupCommand { + + def main(args: Array[String]) { + val opts = new ConsumerGroupCommandOptions(args) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.") + + // should have exactly one action + val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") + + opts.checkArgs() + + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + + try { + if (opts.options.has(opts.listOpt)) + list(zkClient) + else if (opts.options.has(opts.describeOpt)) + describe(zkClient, opts) + else if (opts.options.has(opts.deleteOpt)) + delete(zkClient, opts) + } catch { + case e: Throwable => + println("Error while executing consumer group command " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + zkClient.close() + } + } + + def list(zkClient: ZkClient) { + ZkUtils.getConsumerGroups(zkClient).foreach(println) + } + + def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val configs = parseConfigs(opts) + val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt + val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt + val group = opts.options.valueOf(opts.groupOpt) + val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group) + if (topics.isEmpty) { + println("No topic available for consumer group provided") + } + topics.foreach(topic => describeGroupByTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + } + + def delete(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + if (opts.options.has(opts.groupOpt) && opts.options.has(opts.topicOpt)) { + deleteForTopic(zkClient, opts) + } + else if (opts.options.has(opts.groupOpt)) { + deleteForGroup(zkClient, opts) + } + else if (opts.options.has(opts.topicOpt)) { + deleteAllForTopic(zkClient, opts) + } + } + + private def deleteForGroup(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val groups = opts.options.valuesOf(opts.groupOpt) + groups.foreach { group => + try { + if (AdminUtils.deleteConsumerGroupInZK(zkClient, group)) + println("Deleted all consumer group information for group %s in zookeeper.".format(group)) + else + println("Delete for group %s failed because its consumers are still active.".format(group)) + } + catch { + case e: ZkNoNodeException => + println("Delete for group %s failed because group does not exist.".format(group)) + } + } + } + + private def deleteForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val groups = opts.options.valuesOf(opts.groupOpt) + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + groups.foreach { group => + try { + if (AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic)) + println("Deleted consumer group information for group %s topic %s in zookeeper.".format(group, topic)) + else + println("Delete for group %s topic %s failed because its consumers are still active.".format(group, topic)) + } + catch { + case e: ZkNoNodeException => + println("Delete for group %s topic %s failed because group does not exist.".format(group, topic)) + } + } + } + + private def deleteAllForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + if (AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic)) { + println("Deleted all consumer group information for topic %s in zookeeper.".format(topic)) + } + else { + println("Delete for topic %s failed because topic still exists.".format(topic)) + } + } + + private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = { + val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*""")) + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid config: all configs to be added must be in the format \"key=val\".") + val props = new Properties + configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + props + } + + private def describeGroupByTopic(zkClient: ZkClient, group: String, topic: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int) = { + val (topicPidMap, offsetMap) = getOffsetsByTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs) + processTopic(zkClient, group, topic, topicPidMap, offsetMap) + } + + private def getOffsetsByTopic(zkClient: ZkClient, group: String, topic: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int) = { + val topicList = List[String](topic) + val topicPidMap = Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq: _*) + val topicPartitions = topicPidMap.flatMap { case (topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _))}.toSeq + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + val offsetMap = mutable.Map[TopicAndPartition, Long]() + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if(offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + if(ZkUtils.pathExists(zkClient, topicDirs.consumerOffsetDir)) + offsetMap.put(topicAndPartition, -1) + else { + println("Failed to fetch offset for group %s partition %s from zookeeper".format(group, topicAndPartition)) + } + } + } + else if(offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + else { + println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + } + } + channel.disconnect() + (topicPidMap, offsetMap.toMap) + } + + + private def processTopic(zkClient: ZkClient, group: String, topic: String, topicPidMap: Map[String, Seq[Int]], offsetMap: Map[TopicAndPartition, Long]) { + println("%s, %s, %s, %s, %s, %s, %s".format("GROUP", "TOPIC", "PID", "CURRENT OFFSET", "LOG SIZE", "LAG", "OWNER")) + topicPidMap.get(topic) match { + case Some(pids) => + pids.sorted.foreach(pid => processPartition(zkClient, group, topic, pid, offsetMap)) + case None => println("No partition found for group %s topic %s".format(group, topic)) + } + } + + private def processPartition(zkClient: ZkClient, group: String, topic: String, pid: Int, offsetMap: Map[TopicAndPartition, Long]) { + val topicPartition = TopicAndPartition(topic, pid) + val offsetOpt = offsetMap.get(topicPartition) + val groupDirs = new ZKGroupTopicDirs(group, topic) + val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1 + ZkUtils.getLeaderForPartition(zkClient, topic, pid) match { + case Some(bid) => + val consumerOpt = getConsumer(zkClient, bid) + consumerOpt match { + case Some(consumer) => + val topicAndPartition = TopicAndPartition(topic, pid) + val request = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val lagString = offsetOpt.map(o => if(o == -1) "unknown" else (logSize - o).toString) + println("%s, %s, %s, %s, %s, %s, %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), + owner.getOrElse("none"))) + case None => // ignore + } + case None => + println("No broker for partition %s - %s".format(topic, pid)) + } + } + + private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { + try { + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker")) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getStackTraceString) + None + } + } + + class ConsumerGroupCommandOptions(args: Array[String]) { + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val groupOpt = parser.accepts("group", "The consumer group we wish to describe") + .withRequiredArg + .describedAs("consumer group") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "The topic whose consumer group information should be deleted") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val configOpt = parser.accepts("config", "Configuration for timeouts for instance --config channelSocketTimeoutMs=600") + .withRequiredArg + .describedAs("name=value") + .ofType(classOf[String]) + val listOpt = parser.accepts("list", "List all consumer groups.") + val describeOpt = parser.accepts("describe", "Describe consumer group and list offset lag related to given group.") + val deleteOpt = parser.accepts("delete", "Pass in groups to delete topic partition offsets and ownership information over the entire consumer group. " + + "Pass in groups with a topic to just delete the given topic's partition offsets and ownership information for the given consumer groups. " + + "Pass in just a topic to delete the topic partition offsets and ownership information for every consumer group. " + + "WARNING: Assumes the consumers in the group are not active.") + val options = parser.parse(args : _*) + + val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) + + def checkArgs() { + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if (options.has(describeOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) + if (options.has(deleteOpt) && !options.has(groupOpt) && !options.has(topicOpt)) + CommandLineUtils.printUsageAndDie(parser, "Option %s either takes %s, %s, or both".format(deleteOpt, groupOpt, topicOpt)) + + // check invalid args + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt) + CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt) + CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt) + } + } +} diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c14bd45..f99f2ff 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -749,6 +749,26 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getConsumerGroups(zkClient: ZkClient) = { + ZkUtils.getChildren(zkClient, ConsumersPath) + } + + def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = { + ZkUtils.getChildrenParentMayNotExist(zkClient, new ZKGroupDirs(consumerGroup).consumerGroupDir + "/owners") + } + + def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = { + val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath) + if (groups == null) Set.empty + else { + groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) => + val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupDir + "/offsets") + if (topics.contains(topic)) consumerGroupsForTopic + group + else consumerGroupsForTopic + } + } + } } object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala new file mode 100644 index 0000000..e3a8a67 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupTest.scala @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ +import kafka.utils.{ZKGroupDirs, ZKGroupTopicDirs, ZkUtils, TestUtils} +import kafka.server.{KafkaServer, KafkaConfig} +import org.junit.Test +import kafka.consumer._ +import kafka.common.TopicAndPartition +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} +import kafka.integration.KafkaServerTestHarness + + +class DeleteConsumerGroupTest extends JUnit3Suite with KafkaServerTestHarness { + val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_)) + + @Test + def testDeleteConsumerGroupInZK() { + val topic = "test" + val groupToDelete = "groupToDelete" + val otherGroup = "other" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, false) + fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete) + + TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(groupToDelete)), + "DeleteConsumerGroupInZK should delete the provided consumer group's directory") + TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)), + "DeleteConsumerGroupInZK should not delete unrelated consumer group directories") + } + + @Test + def testDeleteConsumerGroupInZKDoesNothingForActiveConsumerGroup() { + val topic = "test" + val groupToDelete = "groupToDelete" + val otherGroup = "other" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + fillInConsumerGroupInfo(topic, groupToDelete, "consumer", 0, 10, true) + fillInConsumerGroupInfo(topic, otherGroup, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInZK(zkClient, groupToDelete) + + TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(groupToDelete)), + "DeleteConsumerGroupInZK should not delete the provided consumer group's directory if the consumer group is still active") + TestUtils.waitUntilTrue(() => groupDirExists(new ZKGroupDirs(otherGroup)), + "DeleteConsumerGroupInZK should not delete unrelated consumer group directories") + } + + @Test + def testDeleteConsumerGroupInfoForTopicInZKForGroupConsumingOneTopic() { + val topic = "test" + val group = "group" + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + fillInConsumerGroupInfo(topic, group, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topic) + + TestUtils.waitUntilTrue(() => !groupDirExists(new ZKGroupDirs(group)), + "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's directory if it just consumes from one topic") + } + + @Test + def testDeleteConsumerGroupInfoForTopicInZKForConsumerGroupConsumingMultipleTopics() { + val topicToDelete = "topicToDelete" + val otherTopic = "other" + val group = "group" + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + + fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, false) + fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, false) + + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete) + + TestUtils.waitUntilTrue(() => !groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)), + "DeleteConsumerGroupInfoForTopicInZK should delete the provided consumer group's owner and offset directories for the given topic") + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, otherTopic)), + "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics") + } + + @Test + def testDeleteConsumerGroupInfoForTopicInZKDoesNothingForActiveConsumerGroupConsumingMultipleTopics() { + val topicToDelete = "topicToDelete" + val otherTopic = "other" + val group = "group" + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + + fillInConsumerGroupInfo(topicToDelete, group, "consumer", 0, 10, true) + fillInConsumerGroupInfo(otherTopic, group, "consumer", 0, 10, true) + + AdminUtils.deleteConsumerGroupInfoForTopicInZK(zkClient, group, topicToDelete) + + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, topicToDelete)), + "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for the given topic if the consumer group is still active") + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(new ZKGroupTopicDirs(group, otherTopic)), + "DeleteConsumerGroupInfoForTopicInZK should not delete the provided consumer group's owner and offset directories for unrelated topics") + } + + + @Test + def testDeleteAllConsumerGroupInfoForTopicInZKDoesNothingWhenTopicExists() { + val topic = "test" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val groupTopicDirs = groups.map(group => new ZKGroupTopicDirs(group, topic)) + groupTopicDirs.foreach(dir => fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false)) + + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic) + + TestUtils.waitUntilTrue(() => groupTopicDirs.forall(groupTopicOffsetAndOwnerDirsExist), + "DeleteAllConsumerGroupInfoForTopicInZK not should delete consumer group info on an existing topic") + } + + @Test + def testDeleteAllConsumerGroupInfoForTopicInZKAfterDeleteTopic() { + val topicToDelete = "topicToDelete" + val otherTopic = "otherTopic" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete)) + val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic)) + groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, false)) + groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, false)) + + AdminUtils.deleteTopic(zkClient, topicToDelete) + verifyTopicDeletion(topicToDelete, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete) + + TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicOffsetAndOwnerDirsExist), + "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicOffsetAndOwnerDirsExist), + "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + } + + @Test + def testConsumptionOnRecreatedTopicAfterDeleteAllConsumerGroupInfoForTopicInZK() { + val topic = "topic" + val group = "group" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val dir = new ZKGroupTopicDirs(group, topic) + fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, false) + + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic) + + TestUtils.waitUntilTrue(() => !groupDirExists(dir), + "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + //produce events + val producer = TestUtils.createNewProducer(brokerList) + produceEvents(producer, topic, List.fill(10)("test")) + + //consume events + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, "consumer") + consumerProps.put("auto.commit.enable", "false") + consumerProps.put("auto.offset.reset", "smallest") + consumerProps.put("consumer.timeout.ms", "2000") + consumerProps.put("fetch.wait.max.ms", "0") + val consumerConfig = new ConsumerConfig(consumerProps) + val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head + consumeEvents(messageStream, 5) + consumerConnector.commitOffsets(false) + producer.close() + consumerConnector.shutdown() + + TestUtils.waitUntilTrue(() => groupTopicOffsetAndOwnerDirsExist(dir), + "Consumer group info should exist after consuming from a recreated topic") + } + + private def fillInConsumerGroupInfo(topic: String, group: String, consumerId: String, partition: Int, offset: Int, registerConsumer: Boolean) { + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, consumerId) + val consumerConfig = new ConsumerConfig(consumerProps) + val dir = new ZKGroupTopicDirs(group, topic) + TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset) + ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), "") + ZkUtils.makeSurePersistentPathExists(zkClient, dir.consumerRegistryDir) + if (registerConsumer) { + ZkUtils.createEphemeralPathExpectConflict(zkClient, dir.consumerRegistryDir + "/" + consumerId, "") + } + } + + private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) { + val topicAndPartition = TopicAndPartition(topic, 0) + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted") + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), + "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted") + // ensure that the topic-partition has been deleted from all brokers' replica managers + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None), + "Replica manager's should have deleted all of this topic's partitions") + // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper + assertTrue("Replica logs not deleted after delete topic is complete", + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) + } + + private def groupDirExists(dir: ZKGroupDirs) = { + ZkUtils.pathExists(zkClient, dir.consumerGroupDir) + } + + private def groupTopicOffsetAndOwnerDirsExist(dir: ZKGroupTopicDirs) = { + ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir) + } + + private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) { + messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes))) + } + + private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) { + val iter = messageStream.iterator + (0 until n).foreach(_ => iter.next) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 54755e8..a4cb89a 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -140,9 +140,10 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfigs(numConfigs: Int, - enableControlledShutdown: Boolean = true): List[Properties] = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): List[Properties] = { for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port, enableControlledShutdown) + yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic) } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { @@ -153,7 +154,8 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfig(nodeId: Int, port: Int = choosePort(), - enableControlledShutdown: Boolean = true): Properties = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): Properties = { val props = new Properties if (nodeId >= 0) props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") @@ -162,6 +164,7 @@ object TestUtils extends Logging { props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) + props.put("delete.topic.enable", enableDeleteTopic.toString) props } -- 1.7.12.4