From 5faa2b27dca3da308a77e4be65e2cdaf128ed864 Mon Sep 17 00:00:00 2001 From: Onur Karaman Date: Fri, 9 Jan 2015 10:37:13 -0800 Subject: [PATCH] Patch for KAFKA-1476 --- bin/kafka-consumer-groups.sh | 17 ++ core/src/main/scala/kafka/admin/AdminUtils.scala | 26 ++- .../scala/kafka/admin/ConsumerGroupCommand.scala | 252 +++++++++++++++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 20 ++ ...eleteAllConsumerGroupInfoForTopicInZKTest.scala | 157 +++++++++++++ .../test/scala/unit/kafka/utils/TestUtils.scala | 9 +- 6 files changed, 477 insertions(+), 4 deletions(-) create mode 100755 bin/kafka-consumer-groups.sh create mode 100644 core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala create mode 100644 core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala diff --git a/bin/kafka-consumer-groups.sh b/bin/kafka-consumer-groups.sh new file mode 100755 index 0000000..f4786db --- /dev/null +++ b/bin/kafka-consumer-groups.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand $@ diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 28b12c7..c22744b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -20,7 +20,7 @@ package kafka.admin import kafka.common._ import kafka.cluster.Broker import kafka.log.LogConfig -import kafka.utils.{Logging, ZkUtils, Json} +import kafka.utils.{Logging, ZkUtils, Json, ZKGroupTopicDirs} import kafka.api.{TopicMetadata, PartitionMetadata} import java.util.Random @@ -164,6 +164,30 @@ object AdminUtils extends Logging { ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic)) } + /** + * Delete every consumer group's information about the given topic in Zookeeper. + * With forceDelete as false: only deletes consumer group information if the topic does not exist. + * + * @param zkClient Zookeeper client + * @param topic Topic of the consumer group information we wish to delete + * @param forceDelete Delete the consumer group information even if the topic exists + * @return whether or not we deleted the consumer group information + */ + def deleteAllConsumerGroupInfoForTopicInZK(zkClient: ZkClient, topic: String, forceDelete: Boolean) = { + if (!forceDelete && ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) { + false + } + else { + val groups = ZkUtils.getAllConsumerGroupsForTopic(zkClient, topic) + groups.foreach { group => + val dirs = new ZKGroupTopicDirs(group, topic) + ZkUtils.deletePathRecursive(zkClient, dirs.consumerOwnerDir) + ZkUtils.deletePathRecursive(zkClient, dirs.consumerOffsetDir) + } + true + } + } + def topicExists(zkClient: ZkClient, topic: String): Boolean = zkClient.exists(ZkUtils.getTopicPath(topic)) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala new file mode 100644 index 0000000..2b82b68 --- /dev/null +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + + +import kafka.utils._ +import org.I0Itec.zkclient.ZkClient +import kafka.common._ +import java.util.Properties +import kafka.client.ClientUtils +import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, OffsetFetchResponse, OffsetFetchRequest} +import org.I0Itec.zkclient.exception.ZkNoNodeException +import kafka.common.TopicAndPartition +import joptsimple.{OptionSpec, OptionParser} +import scala.collection.{Set, mutable} +import kafka.consumer.SimpleConsumer +import collection.JavaConversions._ + + +object ConsumerGroupCommand { + + def main(args: Array[String]): Unit = { + val opts = new ConsumerGroupCommandOptions(args) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete all consumer group info for a topic.") + + // should have exactly one action + val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteAllConsumerGroupInfoForTopicOpt).count(opts.options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete-all-consumer-group-info-for-topic") + + opts.checkArgs() + + val zkClient = new ZkClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, ZKStringSerializer) + + try { + if (opts.options.has(opts.listOpt)) + list(zkClient) + else if (opts.options.has(opts.describeOpt)) + describe(zkClient, opts) + else if (opts.options.has(opts.deleteAllConsumerGroupInfoForTopicOpt)) + deleteAllConsumerGroupInfoForTopic(zkClient, opts) + } catch { + case e: Throwable => + println("Error while executing topic command " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + zkClient.close() + } + } + + def list(zkClient: ZkClient) { + ZkUtils.getConsumerGroups(zkClient).foreach(println) + } + + def describe(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val configs = parseConfigs(opts) + val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt + val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt + val group = opts.options.valueOf(opts.groupOpt) + val topics = ZkUtils.getTopicsByConsumerGroup(zkClient, group) + + if (topics.isEmpty) { + CommandLineUtils.printUsageAndDie(opts.parser, "No Topic available for consumer group provided") + } + + topics.foreach(topic => describeGroupByTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs)) + } + + def deleteAllConsumerGroupInfoForTopic(zkClient: ZkClient, opts: ConsumerGroupCommandOptions) { + val topic = opts.options.valueOf(opts.topicOpt) + Topic.validate(topic) + val forceDelete = opts.options.has(opts.forceDeleteOpt) + if (AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic, forceDelete)) { + println("Deleted all consumer group information for topic %s in zookeeper.".format(topic)) + } + else { + println("--delete-all-consumer-group-info-for-topic failed because topic %s still exists.".format(topic)) + } + } + + private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = { + val configsToBeAdded = opts.options.valuesOf(opts.configOpt).map(_.split("""\s*=\s*""")) + require(configsToBeAdded.forall(config => config.length == 2), + "Invalid topic config: all configs to be added must be in the format \"key=val\".") + val props = new Properties + configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim)) + props + } + + private def describeGroupByTopic(zkClient: ZkClient, group: String, topic: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int) = { + val (topicPidMap, offsetMap) = getOffsetsByTopic(zkClient, group, topic, channelSocketTimeoutMs, channelRetryBackoffMs) + processTopic(zkClient, group, topic, topicPidMap, offsetMap) + } + + private def getOffsetsByTopic(zkClient: ZkClient, group: String, topic: String, channelSocketTimeoutMs: Int, channelRetryBackoffMs: Int) = { + val topicList = List[String](topic) + val topicPidMap = Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq: _*) + val topicPartitions = topicPidMap.flatMap { case (topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _))}.toSeq + val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) + val offsetMap = mutable.Map[TopicAndPartition, Long]() + channel.send(OffsetFetchRequest(group, topicPartitions)) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + + offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => + if(offsetAndMetadata == OffsetMetadataAndError.NoOffset) { + val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) + // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool + // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) + try { + val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong + offsetMap.put(topicAndPartition, offset) + } catch { + case z: ZkNoNodeException => + if(ZkUtils.pathExists(zkClient, topicDirs.consumerOffsetDir)) + offsetMap.put(topicAndPartition, -1) + else + throw z + } + } + else if(offsetAndMetadata.error == ErrorMapping.NoError) + offsetMap.put(topicAndPartition, offsetAndMetadata.offset) + else { + println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) + } + } + channel.disconnect() + (topicPidMap, offsetMap.toMap) + } + + + private def processTopic(zkClient: ZkClient, group: String, topic: String, topicPidMap: Map[String, Seq[Int]], offsetMap: Map[TopicAndPartition, Long]) { + println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("GROUP", "TOPIC", "PID", "CURRENT OFFSET", "LOG SIZE", "LAG", "OWNER")) + topicPidMap.get(topic) match { + case Some(pids) => + pids.sorted.foreach(pid => processPartition(zkClient, group, topic, pid, offsetMap)) + case None => // ignore + } + } + + private def processPartition(zkClient: ZkClient, group: String, topic: String, pid: Int, offsetMap: Map[TopicAndPartition, Long]) { + val topicPartition = TopicAndPartition(topic, pid) + val offsetOpt = offsetMap.get(topicPartition) + val groupDirs = new ZKGroupTopicDirs(group, topic) + val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1 + val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() + ZkUtils.getLeaderForPartition(zkClient, topic, pid) match { + case Some(bid) => + val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid)) + consumerOpt match { + case Some(consumer) => + val topicAndPartition = TopicAndPartition(topic, pid) + val request = + OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))) + val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head + + val lagString = offsetOpt.map(o => if(o == -1) "unknown" else (logSize - o).toString) + println("%-15s, %-30s, %-3s, %-15s, %-15s, %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"), + owner match { case Some(ownerStr) => ownerStr case None => "none"})) + case None => // ignore + } + case None => + println("No broker for partition %s - %s".format(topic, pid)) + } + } + + private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { + try { + ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match { + case Some(brokerInfoString) => + Json.parseFull(brokerInfoString) match { + case Some(m) => + val brokerInfo = m.asInstanceOf[Map[String, Any]] + val host = brokerInfo.get("host").get.asInstanceOf[String] + val port = brokerInfo.get("port").get.asInstanceOf[Int] + Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker")) + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + case None => + throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid)) + } + } catch { + case t: Throwable => + println("Could not parse broker info due to " + t.getStackTraceString) + None + } + } + + class ConsumerGroupCommandOptions(args: Array[String]) { + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) + val groupOpt = parser.accepts("group", "The consumer group we wish to describe") + .withRequiredArg + .describedAs("consumer group") + .ofType(classOf[String]) + val topicOpt = parser.accepts("topic", "The topic whose consumer group information should be deleted") + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) + val configOpt = parser.accepts("config", "Configuration for timeouts for instance --config channelSocketTimeoutMs=600") + .withRequiredArg + .describedAs("name=value") + .ofType(classOf[String]) + val forceDeleteOpt = parser.accepts("force-delete", "If set along with delete-all-consumer-group-info-for-topic, " + + "delete topic partition offsets and ownership information for " + + "every consumer group on the given topic even if the topic exists.") + val listOpt = parser.accepts("list", "List all consumer groups.") + val describeOpt = parser.accepts("describe", "Describe Consumer group and list offset lag related to given group.") + val deleteAllConsumerGroupInfoForTopicOpt = parser.accepts("delete-all-consumer-group-info-for-topic", + "Delete topic partition offsets and ownership information for every consumer group. " + + "WARNING: Assumes there are no active producers and consumers on the topic.") + val options = parser.parse(args : _*) + + val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteAllConsumerGroupInfoForTopicOpt) + + def checkArgs() { + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + if (options.has(describeOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, groupOpt) + if (options.has(deleteAllConsumerGroupInfoForTopicOpt)) + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) + + // check invalid args + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt) + CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteAllConsumerGroupInfoForTopicOpt) + CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt) + CommandLineUtils.checkInvalidArgs(parser, options, forceDeleteOpt, allConsumerGroupLevelOpts - deleteAllConsumerGroupInfoForTopicOpt) + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index c14bd45..b1c4d3f 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -749,6 +749,26 @@ object ZkUtils extends Logging { }.flatten.toSet } } + + def getConsumerGroups(zkClient: ZkClient) = { + zkClient.getChildren(ConsumersPath) + } + + def getTopicsByConsumerGroup(zkClient: ZkClient,consumerGroup:String) = { + zkClient.getChildren(ConsumersPath+"/"+consumerGroup+"/owners") + } + + def getAllConsumerGroupsForTopic(zkClient: ZkClient, topic: String): Set[String] = { + val groups = ZkUtils.getChildrenParentMayNotExist(zkClient, ConsumersPath) + if (groups == null) Set.empty + else { + groups.foldLeft(Set.empty[String]) {(consumerGroupsForTopic, group) => + val topics = getChildren(zkClient, new ZKGroupDirs(group).consumerGroupDir + "/offsets") + if (topics.contains(topic)) consumerGroupsForTopic + group + else consumerGroupsForTopic + } + } + } } object ZKStringSerializer extends ZkSerializer { diff --git a/core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala new file mode 100644 index 0000000..ef008fc --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/DeleteAllConsumerGroupInfoForTopicInZKTest.scala @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import org.scalatest.junit.JUnit3Suite +import junit.framework.Assert._ +import kafka.utils.{ZKGroupTopicDirs, ZkUtils, TestUtils} +import kafka.server.{KafkaServer, KafkaConfig} +import org.junit.Test +import kafka.consumer._ +import kafka.common.TopicAndPartition +import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} +import kafka.integration.KafkaServerTestHarness + + +class DeleteAllConsumerGroupInfoForTopicInZKTest extends JUnit3Suite with KafkaServerTestHarness { + val configs = TestUtils.createBrokerConfigs(3, false, true).map(new KafkaConfig(_)) + + @Test + def testDoNothingWhenTopicExists() { + val topic = "test" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val groupTopicDirs = groups.map(group => new ZKGroupTopicDirs(group, topic)) + groupTopicDirs.foreach(dir => fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, "")) + + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic, forceDelete = false) + + TestUtils.waitUntilTrue(() => groupTopicDirs.forall(groupTopicDirsExist), + "DeleteAllConsumerGroupInfoForTopicInZK should do nothing when topic exists") + } + + @Test + def testDeleteAllConsumerGroupInfoWhenTopicExistsWithForceDelete() { + val topic = "test" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val groupTopicDirs = groups.map(group => new ZKGroupTopicDirs(group, topic)) + groupTopicDirs.foreach(dir => fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, "")) + + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic, forceDelete = true) + + TestUtils.waitUntilTrue(() => !groupTopicDirs.forall(groupTopicDirsExist), + "Consumer group info on existing topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK with forceDelete") + } + + @Test + def testDeleteAllConsumerGroupInfoForTopicAfterDeleteTopic() { + val topicToDelete = "topicToDelete" + val otherTopic = "otherTopic" + val groups = Seq("group1", "group2") + + TestUtils.createTopic(zkClient, topicToDelete, 1, 3, servers) + TestUtils.createTopic(zkClient, otherTopic, 1, 3, servers) + val groupTopicDirsForTopicToDelete = groups.map(group => new ZKGroupTopicDirs(group, topicToDelete)) + val groupTopicDirsForOtherTopic = groups.map(group => new ZKGroupTopicDirs(group, otherTopic)) + groupTopicDirsForTopicToDelete.foreach(dir => fillInConsumerGroupInfo(topicToDelete, dir.group, "consumer", 0, 10, "")) + groupTopicDirsForOtherTopic.foreach(dir => fillInConsumerGroupInfo(otherTopic, dir.group, "consumer", 0, 10, "")) + + AdminUtils.deleteTopic(zkClient, topicToDelete) + verifyTopicDeletion(topicToDelete, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topicToDelete, forceDelete = false) + + TestUtils.waitUntilTrue(() => !groupTopicDirsForTopicToDelete.exists(groupTopicDirsExist), + "Consumer group info on deleted topic topic should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + TestUtils.waitUntilTrue(() => groupTopicDirsForOtherTopic.forall(groupTopicDirsExist), + "Consumer group info on unrelated topics should not be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + } + + @Test + def testConsumptionOnRecreatedTopicAfterDeleteAllConsumerGroupInfoForTopicInZK() { + val topic = "topic" + val group = "group" + + TestUtils.createTopic(zkClient, topic, 1, 3, servers) + val dir = new ZKGroupTopicDirs(group, topic) + fillInConsumerGroupInfo(topic, dir.group, "consumer", 0, 10, "") + + AdminUtils.deleteTopic(zkClient, topic) + verifyTopicDeletion(topic, servers) + AdminUtils.deleteAllConsumerGroupInfoForTopicInZK(zkClient, topic, forceDelete = false) + + TestUtils.waitUntilTrue(() => !groupTopicDirsExist(dir), + "Consumer group info on related topics should be deleted by DeleteAllConsumerGroupInfoForTopicInZK") + //produce events + val producer = TestUtils.createNewProducer(brokerList) + produceEvents(producer, topic, List.fill(10)("test")) + + //consume events + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, "consumer") + consumerProps.put("auto.commit.enable", "false") + consumerProps.put("auto.offset.reset", "smallest") + consumerProps.put("consumer.timeout.ms", "2000") + consumerProps.put("fetch.wait.max.ms", "0") + val consumerConfig = new ConsumerConfig(consumerProps) + val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig) + val messageStream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head + consumeEvents(messageStream, 5) + consumerConnector.commitOffsets(false) + producer.close() + consumerConnector.shutdown() + + TestUtils.waitUntilTrue(() => groupTopicDirsExist(dir), "Consumer group info should exist after consuming from a recreated topic") + } + + private def fillInConsumerGroupInfo(topic: String, group: String, consumerId: String, partition: Int, offset: Int, data: String) { + val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, consumerId) + val consumerConfig = new ConsumerConfig(consumerProps) + val dir = new ZKGroupTopicDirs(group, topic) + TestUtils.updateConsumerOffset(consumerConfig, dir.consumerOffsetDir + "/" + partition, offset) + ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.getConsumerPartitionOwnerPath(group, topic, partition), data) + } + + private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) { + val topicAndPartition = TopicAndPartition(topic, 0) + // wait until admin path for delete topic is deleted, signaling completion of topic deletion + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), + "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted") + TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), + "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted") + // ensure that the topic-partition has been deleted from all brokers' replica managers + TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.replicaManager.getPartition(topic, 0) == None), + "Replica manager's should have deleted all of this topic's partitions") + // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper + assertTrue("Replica logs not deleted after delete topic is complete", + servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty)) + } + + private def groupTopicDirsExist(dir: ZKGroupTopicDirs) = { + ZkUtils.pathExists(zkClient, dir.consumerOffsetDir) && ZkUtils.pathExists(zkClient, dir.consumerOwnerDir) + } + + private def produceEvents(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, messages: List[String]) { + messages.foreach(message => producer.send(new ProducerRecord(topic, message.getBytes))) + } + + private def consumeEvents(messageStream: KafkaStream[Array[Byte], Array[Byte]], n: Int) { + val iter = messageStream.iterator + (0 until n).foreach(_ => iter.next) + } +} diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ac15d34..e4780bf 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -139,9 +139,10 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfigs(numConfigs: Int, - enableControlledShutdown: Boolean = true): List[Properties] = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): List[Properties] = { for((port, node) <- choosePorts(numConfigs).zipWithIndex) - yield createBrokerConfig(node, port, enableControlledShutdown) + yield createBrokerConfig(node, port, enableControlledShutdown, enableDeleteTopic) } def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = { @@ -152,7 +153,8 @@ object TestUtils extends Logging { * Create a test config for the given node id */ def createBrokerConfig(nodeId: Int, port: Int = choosePort(), - enableControlledShutdown: Boolean = true): Properties = { + enableControlledShutdown: Boolean = true, + enableDeleteTopic: Boolean = false): Properties = { val props = new Properties if (nodeId >= 0) props.put("broker.id", nodeId.toString) props.put("host.name", "localhost") @@ -161,6 +163,7 @@ object TestUtils extends Logging { props.put("zookeeper.connect", TestZKUtils.zookeeperConnect) props.put("replica.socket.timeout.ms", "1500") props.put("controlled.shutdown.enable", enableControlledShutdown.toString) + props.put("delete.topic.enable", enableDeleteTopic.toString) props } -- 1.7.12.4