From 5a9dea60f8926a0da6332c7cc448dc559bb1397a Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 15 Oct 2014 11:59:55 -0700 Subject: [PATCH 1/4] KAFKA-559 Add tool to garbage collect old consumer metadata entries. --- bin/kafka-cleanup-obsolete-zk-entires.sh | 19 ++ .../kafka/tools/CleanupObsoleteZkEntires.scala | 316 +++++++++++++++++++++ 2 files changed, 335 insertions(+) create mode 100644 bin/kafka-cleanup-obsolete-zk-entires.sh create mode 100644 core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala diff --git a/bin/kafka-cleanup-obsolete-zk-entires.sh b/bin/kafka-cleanup-obsolete-zk-entires.sh new file mode 100644 index 0000000..f2c0cb8 --- /dev/null +++ b/bin/kafka-cleanup-obsolete-zk-entires.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) +export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" +$base_dir/kafka-run-class.sh kafka.tools.CleanupObsoleteZkEntires $@ diff --git a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala new file mode 100644 index 0000000..2412c9b --- /dev/null +++ b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import joptsimple.OptionParser +import kafka.utils._ +import kafka.utils.ZkUtils._ +import org.apache.zookeeper.data.Stat +import org.I0Itec.zkclient.ZkClient + +/** + * This tool is removes obsolete znodes from Zookeeper server. Many use cases involve transient consumers + * which end up creating bunch of entries (group and offsets) in Zookeeper. This tool can perform cleanup + * based on topic name or group name. + * + * If topic name is provided as input, it would scan through consumer group entries and delete any that + * had no offset update since the given date. + * + * If group name is provided as input, it would scan through all the topics in the group and perform + * deletion if there were no updates since the time provided. + */ + +object CleanupObsoleteZkEntires extends Logging { + + /* These are some default values used for zookeeper connection */ + private val ZkSessionTimeout: Int = 3000 + private val ZkConnectionTimeout: Int = 3000 + + /* The zookeeper connection string (host:port) */ + private var zkConnect: String = null + + /* The mode in which the tool would run */ + private var deleteBy: String = null + + /* The name of the topic or group to be cleaned up */ + private var topicOrGroupName: String = null + + /* Time threshold. Znodes with modified time < 'since' are deleted */ + private var since: Long = -1 + + /* A flag which indicates if the tool runs in passive mode w/o actually deleting anything */ + private var dryRun: Boolean = false + + def main(args: Array[String]) { + var zkClient: ZkClient = null + + try { + parseArguments(args) + info("Connecting to zookeeper instance at " + zkConnect) + zkClient = new ZkClient(zkConnect, ZkSessionTimeout, ZkConnectionTimeout, ZKStringSerializer) + + if(deleteBy == "group") + removeObsoleteConsumerGroups(zkClient, topicOrGroupName, since) + else + removeObsoleteConsumerTopics(zkClient, topicOrGroupName, since) + + info("Kafka obsolete Zk entires cleanup tool shutdown successfully.") + } catch { + case e => + warn("removal failed because of " + e.getMessage) + warn(Utils.stackTrace(e)) + } finally { + if (zkClient != null) + zkClient.close() + } + } + + /** + * Performs 2 initial checks and then based on the value of time threshold provided, + * would delete the obsolete topics in the group + * + * @param zkClient Zookeeper client object which is already connected + * @param groupID The group-ID + * @param since The time threshold considered for deletion + */ + def removeObsoleteConsumerGroups(zkClient: ZkClient, groupID: String, since: Long) + { + val dirs: ZKGroupDirs = new ZKGroupDirs(groupID) + + if(!pathExists(zkClient, dirs.consumerGroupDir)) { + warn("Path " + dirs.consumerGroupDir + " doesn't exist on the zookeeper instance. Aborted.") + return + } + + if(checkIfLiveConsumers(zkClient, groupID)) { + warn("Aborted.") + return + } + + if(since == -1) { + // If no threshold for time provided, delete entire group + deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) + } + else { + var childTopics = getChildren(zkClient, dirs.consumerGroupDir + "/offsets") + var numChildrenTopics:Int = childTopics.length + + for(topic <- childTopics) { + val topicPaths = new ZKGroupTopicDirs(groupID, topic) + numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, since, numChildrenTopics) + } + deleteGroupIfNoTopics(zkClient, dirs, numChildrenTopics) + } + } + + /** + * Scans all the groups to find those which have the required topic. If the topic is present and + * there are no active consumers in that group, it would perform the deletion considering the time + * threshold + * + * @param zkClient Zookeeper client object which is already connected + * @param topic The topic + * @param since The time threshold considered for deletion + */ + def removeObsoleteConsumerTopics(zkClient: ZkClient, topic: String, since: Long) + { + var stat = new Stat() + var childGroups = getChildren(zkClient, ZkUtils.ConsumersPath) + + for(group <- childGroups) { + val topicPaths = new ZKGroupTopicDirs(group, topic) + + if(pathExists(zkClient, topicPaths.consumerOffsetDir) && !checkIfLiveConsumers(zkClient, group)) { + zkClient.readData(topicPaths.consumerGroupDir + "/offsets", stat) + var numChildrenTopics: Int = stat.getNumChildren() + + if(since == -1) { // If no time threshold provided, delete entire topic + deleteZNodeRecursive(zkClient, topicPaths.consumerOffsetDir) + numChildrenTopics = numChildrenTopics - 1 + } + else { + numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, since, numChildrenTopics) + } + deleteGroupIfNoTopics(zkClient, topicPaths, numChildrenTopics) + } + } + } + + /** + * Scans the "broker-partition" entries inside a topic and performs deletion if there were no + * updates since the threshold date provided. Also, deletes the topic itself if there are no more + * child entries under it. + * + * @param zkClient Zookeeper client object which is already connected + * @param dirs A convenient ZKGroupTopicDirs object for quick access to paths + * @param since The time threshold considered for deletion + */ + def removeBrokerPartitionPairs(zkClient: ZkClient, dirs: ZKGroupTopicDirs, since: Long, + numChildrenTopics: Int) : Int = { + var stat = new Stat() + var childBrokerPartitionPair = getChildren(zkClient, dirs.consumerOffsetDir) + var numChildrenPairs:Int = childBrokerPartitionPair.length + + for(brokerPartitionPair <- childBrokerPartitionPair) { + val brokerPartitionPath:String = dirs.consumerOffsetDir + "/" + brokerPartitionPair + zkClient.readData(brokerPartitionPath, stat) + debug("modified time for " + brokerPartitionPath + " is " + stat.getMtime()) + + // delete the node if was never modified after 'since', the threshold timestamp + if(stat.getMtime() < since) { + deleteZNode(zkClient, brokerPartitionPath) + numChildrenPairs = numChildrenPairs - 1 + } + } + + // if the topic is empty, then we can delete it + if(numChildrenPairs == 0) { + deleteZNode(zkClient, dirs.consumerOffsetDir) + return (numChildrenTopics - 1) + } + return numChildrenTopics + } + + /** + * If there are no more topics under this group, delete the entire group + * + * @param zkClient Zookeeper client object which is already connected + * @param dirs A convenient ZKGroupDirs object for quick access to consumer paths + * @param numTopics Number of topics present + */ + def deleteGroupIfNoTopics(zkClient: ZkClient, dirs: ZKGroupDirs, numTopics: Int) { + if(numTopics == 0) { + debug("No topics left in the group \"" + dirs.consumerGroupDir + "\".") + deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) + } + } + + /** + * Performs a defensive check to ensure that there are no consumers currently registered under the group + * + * @param zkClient Zookeeper client object which is already connected + * @param groupID The group-ID + * @return Boolean indicating if there are live comsumers or not + */ + def checkIfLiveConsumers(zkClient: ZkClient, groupID: String) : Boolean = { + val activeConsumers = getConsumersInGroup(zkClient, groupID) + + if(!activeConsumers.isEmpty) { + warn("The group \"" + groupID + "\" has active consumer(s).") + return true + } + debug("No live consumers found for group \"" + groupID + "\".") + return false + } + + /** + * Delete a znode if "dry-run" is off. + * + * @param zkClient Zookeeper client object which is already connected + * @param path The path of znode to be deleted + */ + def deleteZNode(zkClient: ZkClient, path: String) { + info("Deleting \"" + path + "\".") + if(!dryRun) + deletePath(zkClient, path) + } + + /** + * Delete a znode recursively if "dry-run" is off. + * + * @param zkClient Zookeeper client object which is already connected + * @param path The path of znode to be deleted + */ + def deleteZNodeRecursive(zkClient: ZkClient, path: String) { + info("Deleting \"" + path + "\" recursively.") + if(!dryRun) + deletePathRecursive(zkClient, path) + } + + /** + * Parse the i/p arguments and set the member variables + * + * @param args Input arguments to the tool + */ + def parseArguments(args: Array[String]) { + + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED paramter. The connection string " + + "(host:port) for zookeeper connection.") + .withRequiredArg + .describedAs("zookeeper") + .ofType(classOf[String]) + + val deleteByOpt = parser.accepts("delete-by", "REQUIRED parameter. Valid values are: " + + "\"topic\" and \"group\" indicate deletion based on " + + "a specific topic/group respectively.") + .withRequiredArg + .describedAs("delete-by") + .ofType(classOf[String]) + + val topicOrGroupOpt = parser.accepts("name", "REQUIRED paramter. Provide the " + + "name of the topic or group, based on \"delete-by\", " + + "which you want to cleanup.") + .withRequiredArg + .describedAs("name") + .ofType(classOf[String]) + + val sinceOpt = parser.accepts("since", "REQUIRED paramter. Time elapsed since the epoch " + + "considered as threshold. Znodes with modified time before " + + "this timestamp are deleted. Use \"-1\" to skip the check " + + "for modified time.") + .withRequiredArg + .describedAs("since") + .ofType(classOf[String]) + + val dryRunOpt = parser.accepts("dry-run", "OPTIONAL paramter. Passing \"--dry-run\" will " + + "cause this tool to run in passive mode w/o actually deleting " + + "anything from zookeeper but logging all the activities that it " + + "would perform. It is highly recommended to use this setting if " + + "you don't want to risk deleting things and just want to see " + + "znodes which are eligible for deletion.") + .withOptionalArg + .describedAs("dry-run") + .ofType(classOf[String]) + + val options = parser.parse(args : _*) + + for(arg <- List(deleteByOpt, topicOrGroupOpt, zkConnectOpt, sinceOpt, topicOrGroupOpt)) { + if(!options.has(arg)) { + System.err.println("Missing required argument \"" + arg + "\"") + parser.printHelpOn(System.err) + System.exit(1) + } + } + + deleteBy = options.valueOf(deleteByOpt) + if (deleteBy != "topic" && deleteBy != "group") { + System.err.println("Invalid value of \"delete-by\" passed : \"" + deleteBy + "\"") + System.exit(1) + } + + topicOrGroupName = options.valueOf(topicOrGroupOpt) + zkConnect = options.valueOf(zkConnectOpt) + since = options.valueOf(sinceOpt).toLong + + if (options.has(dryRunOpt)) { + dryRun = true + this.logIdent = "[dry-run] " + } + } +} -- 2.1.2 From 828931e4bb12aefe90b49f68cf60d7523833928b Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 23 Oct 2014 11:15:42 -0700 Subject: [PATCH 2/4] Addressing Joel's comments. --- bin/kafka-cleanup-obsolete-zk-entires.sh | 0 .../kafka/tools/CleanupObsoleteZkEntires.scala | 161 +++++++++++---------- 2 files changed, 88 insertions(+), 73 deletions(-) mode change 100644 => 100755 bin/kafka-cleanup-obsolete-zk-entires.sh diff --git a/bin/kafka-cleanup-obsolete-zk-entires.sh b/bin/kafka-cleanup-obsolete-zk-entires.sh old mode 100644 new mode 100755 diff --git a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala index 2412c9b..bb16b59 100644 --- a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala +++ b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala @@ -22,6 +22,7 @@ import kafka.utils._ import kafka.utils.ZkUtils._ import org.apache.zookeeper.data.Stat import org.I0Itec.zkclient.ZkClient +import java.text.{ParseException, SimpleDateFormat} /** * This tool is removes obsolete znodes from Zookeeper server. Many use cases involve transient consumers @@ -50,8 +51,8 @@ object CleanupObsoleteZkEntires extends Logging { /* The name of the topic or group to be cleaned up */ private var topicOrGroupName: String = null - /* Time threshold. Znodes with modified time < 'since' are deleted */ - private var since: Long = -1 + /* Time threshold. Znodes with modified time < 'before' are deleted */ + private var before: Long = -1 /* A flag which indicates if the tool runs in passive mode w/o actually deleting anything */ private var dryRun: Boolean = false @@ -63,16 +64,21 @@ object CleanupObsoleteZkEntires extends Logging { parseArguments(args) info("Connecting to zookeeper instance at " + zkConnect) zkClient = new ZkClient(zkConnect, ZkSessionTimeout, ZkConnectionTimeout, ZKStringSerializer) - - if(deleteBy == "group") - removeObsoleteConsumerGroups(zkClient, topicOrGroupName, since) - else - removeObsoleteConsumerTopics(zkClient, topicOrGroupName, since) - info("Kafka obsolete Zk entires cleanup tool shutdown successfully.") + deleteBy match { + case "group" => + removeObsoleteConsumerGroups(zkClient, topicOrGroupName, before) + case "topic" => + removeObsoleteConsumerTopics(zkClient, topicOrGroupName, before) + case _ => + System.err.println("Invalid value of \"delete-by\" passed. Valid options are \"topic\" or \"group\"") + System.exit(1) + } + + info("Kafka obsolete Zk entries cleanup tool shutdown successfully.") } catch { - case e => - warn("removal failed because of " + e.getMessage) + case e: Throwable => + warn("Removal failed.", e) warn(Utils.stackTrace(e)) } finally { if (zkClient != null) @@ -86,10 +92,11 @@ object CleanupObsoleteZkEntires extends Logging { * * @param zkClient Zookeeper client object which is already connected * @param groupID The group-ID - * @param since The time threshold considered for deletion + * @param before The time threshold considered for deletion */ - def removeObsoleteConsumerGroups(zkClient: ZkClient, groupID: String, since: Long) + private def removeObsoleteConsumerGroups(zkClient: ZkClient, groupID: String, before: Long) { + debug("Removing obsolete consumer groups from group {0} since {1}".format(groupID, before)) val dirs: ZKGroupDirs = new ZKGroupDirs(groupID) if(!pathExists(zkClient, dirs.consumerGroupDir)) { @@ -98,21 +105,21 @@ object CleanupObsoleteZkEntires extends Logging { } if(checkIfLiveConsumers(zkClient, groupID)) { - warn("Aborted.") + warn("Aborted because active consumers were found in the group.") return } - if(since == -1) { + if(before == -1) { // If no threshold for time provided, delete entire group deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) } else { - var childTopics = getChildren(zkClient, dirs.consumerGroupDir + "/offsets") + val childTopics = getChildren(zkClient, dirs.consumerGroupDir + "/offsets") var numChildrenTopics:Int = childTopics.length for(topic <- childTopics) { val topicPaths = new ZKGroupTopicDirs(groupID, topic) - numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, since, numChildrenTopics) + numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) } deleteGroupIfNoTopics(zkClient, dirs, numChildrenTopics) } @@ -125,12 +132,14 @@ object CleanupObsoleteZkEntires extends Logging { * * @param zkClient Zookeeper client object which is already connected * @param topic The topic - * @param since The time threshold considered for deletion + * @param before The time threshold considered for deletion */ - def removeObsoleteConsumerTopics(zkClient: ZkClient, topic: String, since: Long) + private def removeObsoleteConsumerTopics(zkClient: ZkClient, topic: String, before: Long) { - var stat = new Stat() - var childGroups = getChildren(zkClient, ZkUtils.ConsumersPath) + debug("Removing obsolete consumer groups from topic {0} before {1}".format(topic, before)) + + val stat = new Stat() + val childGroups = getChildren(zkClient, ZkUtils.ConsumersPath) for(group <- childGroups) { val topicPaths = new ZKGroupTopicDirs(group, topic) @@ -139,12 +148,12 @@ object CleanupObsoleteZkEntires extends Logging { zkClient.readData(topicPaths.consumerGroupDir + "/offsets", stat) var numChildrenTopics: Int = stat.getNumChildren() - if(since == -1) { // If no time threshold provided, delete entire topic + if(before == -1) { // If no time threshold provided, delete entire topic deleteZNodeRecursive(zkClient, topicPaths.consumerOffsetDir) numChildrenTopics = numChildrenTopics - 1 } else { - numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, since, numChildrenTopics) + numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) } deleteGroupIfNoTopics(zkClient, topicPaths, numChildrenTopics) } @@ -158,32 +167,35 @@ object CleanupObsoleteZkEntires extends Logging { * * @param zkClient Zookeeper client object which is already connected * @param dirs A convenient ZKGroupTopicDirs object for quick access to paths - * @param since The time threshold considered for deletion + * @param before The time threshold considered for deletion */ - def removeBrokerPartitionPairs(zkClient: ZkClient, dirs: ZKGroupTopicDirs, since: Long, - numChildrenTopics: Int) : Int = { - var stat = new Stat() - var childBrokerPartitionPair = getChildren(zkClient, dirs.consumerOffsetDir) - var numChildrenPairs:Int = childBrokerPartitionPair.length - - for(brokerPartitionPair <- childBrokerPartitionPair) { - val brokerPartitionPath:String = dirs.consumerOffsetDir + "/" + brokerPartitionPair + private def removeBrokerPartitionPairs(zkClient: ZkClient, dirs: ZKGroupTopicDirs, before: Long, + numChildrenTopics: Int) : Int = { + debug("Removing broker partition pairs {0} before {1}".format(dirs, before)) + + val stat = new Stat() + val childBrokerPartitionPair = getChildren(zkClient, dirs.consumerOffsetDir) + var numChildrenPairs: Int = childBrokerPartitionPair.length + + for (brokerPartitionPair <- childBrokerPartitionPair) { + val brokerPartitionPath: String = dirs.consumerOffsetDir + "/" + brokerPartitionPair zkClient.readData(brokerPartitionPath, stat) - debug("modified time for " + brokerPartitionPath + " is " + stat.getMtime()) - - // delete the node if was never modified after 'since', the threshold timestamp - if(stat.getMtime() < since) { + debug("Modified time for " + brokerPartitionPath + " is " + stat.getMtime()) + + // delete the node if was never modified after 'before', the threshold timestamp + if (stat.getMtime() < before) { deleteZNode(zkClient, brokerPartitionPath) numChildrenPairs = numChildrenPairs - 1 } - } - + } + // if the topic is empty, then we can delete it - if(numChildrenPairs == 0) { + if (numChildrenPairs == 0) { deleteZNode(zkClient, dirs.consumerOffsetDir) - return (numChildrenTopics - 1) + (numChildrenTopics - 1) + } else { + numChildrenTopics } - return numChildrenTopics } /** @@ -193,7 +205,7 @@ object CleanupObsoleteZkEntires extends Logging { * @param dirs A convenient ZKGroupDirs object for quick access to consumer paths * @param numTopics Number of topics present */ - def deleteGroupIfNoTopics(zkClient: ZkClient, dirs: ZKGroupDirs, numTopics: Int) { + private def deleteGroupIfNoTopics(zkClient: ZkClient, dirs: ZKGroupDirs, numTopics: Int) { if(numTopics == 0) { debug("No topics left in the group \"" + dirs.consumerGroupDir + "\".") deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) @@ -207,15 +219,16 @@ object CleanupObsoleteZkEntires extends Logging { * @param groupID The group-ID * @return Boolean indicating if there are live comsumers or not */ - def checkIfLiveConsumers(zkClient: ZkClient, groupID: String) : Boolean = { + private def checkIfLiveConsumers(zkClient: ZkClient, groupID: String) : Boolean = { val activeConsumers = getConsumersInGroup(zkClient, groupID) if(!activeConsumers.isEmpty) { warn("The group \"" + groupID + "\" has active consumer(s).") - return true + true + } else { + debug("No live consumers found for group \"" + groupID + "\".") + false } - debug("No live consumers found for group \"" + groupID + "\".") - return false } /** @@ -224,7 +237,7 @@ object CleanupObsoleteZkEntires extends Logging { * @param zkClient Zookeeper client object which is already connected * @param path The path of znode to be deleted */ - def deleteZNode(zkClient: ZkClient, path: String) { + private def deleteZNode(zkClient: ZkClient, path: String) { info("Deleting \"" + path + "\".") if(!dryRun) deletePath(zkClient, path) @@ -236,7 +249,7 @@ object CleanupObsoleteZkEntires extends Logging { * @param zkClient Zookeeper client object which is already connected * @param path The path of znode to be deleted */ - def deleteZNodeRecursive(zkClient: ZkClient, path: String) { + private def deleteZNodeRecursive(zkClient: ZkClient, path: String) { info("Deleting \"" + path + "\" recursively.") if(!dryRun) deletePathRecursive(zkClient, path) @@ -247,10 +260,12 @@ object CleanupObsoleteZkEntires extends Logging { * * @param args Input arguments to the tool */ - def parseArguments(args: Array[String]) { - + private def parseArguments(args: Array[String]) { + val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" + val dateFormat = new SimpleDateFormat(dateFormatString) + val parser = new OptionParser - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED paramter. The connection string " + + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED parameter. The connection string " + "(host:port) for zookeeper connection.") .withRequiredArg .describedAs("zookeeper") @@ -263,50 +278,50 @@ object CleanupObsoleteZkEntires extends Logging { .describedAs("delete-by") .ofType(classOf[String]) - val topicOrGroupOpt = parser.accepts("name", "REQUIRED paramter. Provide the " + + val topicOrGroupOpt = parser.accepts("name", "REQUIRED parameter. Provide the " + "name of the topic or group, based on \"delete-by\", " + "which you want to cleanup.") .withRequiredArg .describedAs("name") .ofType(classOf[String]) - val sinceOpt = parser.accepts("since", "REQUIRED paramter. Time elapsed since the epoch " + - "considered as threshold. Znodes with modified time before " + - "this timestamp are deleted. Use \"-1\" to skip the check " + - "for modified time.") + val beforeOpt = parser.accepts("before", "REQUIRED parameter. Date and time used as a threshold. " + + "Znodes with modified time before this timestamp are deleted. " + + "Formatted as " + dateFormatString + ", or use \"-1\" to skip the check.") .withRequiredArg - .describedAs("since") + .describedAs("before") .ofType(classOf[String]) - val dryRunOpt = parser.accepts("dry-run", "OPTIONAL paramter. Passing \"--dry-run\" will " + + val dryRunOpt = parser.accepts("dry-run", "OPTIONAL parameter. Passing \"--dry-run\" will " + "cause this tool to run in passive mode w/o actually deleting " + "anything from zookeeper but logging all the activities that it " + "would perform. It is highly recommended to use this setting if " + "you don't want to risk deleting things and just want to see " + "znodes which are eligible for deletion.") - .withOptionalArg - .describedAs("dry-run") - .ofType(classOf[String]) val options = parser.parse(args : _*) - for(arg <- List(deleteByOpt, topicOrGroupOpt, zkConnectOpt, sinceOpt, topicOrGroupOpt)) { - if(!options.has(arg)) { - System.err.println("Missing required argument \"" + arg + "\"") - parser.printHelpOn(System.err) - System.exit(1) - } - } + CommandLineUtils.checkRequiredArgs(parser, options, deleteByOpt, topicOrGroupOpt, zkConnectOpt, beforeOpt) deleteBy = options.valueOf(deleteByOpt) - if (deleteBy != "topic" && deleteBy != "group") { - System.err.println("Invalid value of \"delete-by\" passed : \"" + deleteBy + "\"") - System.exit(1) - } - topicOrGroupName = options.valueOf(topicOrGroupOpt) zkConnect = options.valueOf(zkConnectOpt) - since = options.valueOf(sinceOpt).toLong + val beforeStr = options.valueOf(beforeOpt) + before = + if (beforeStr == "-1") { + -1 + } + else { + try { + dateFormat.parse(beforeStr).getTime() + } catch { + case e: ParseException => { + System.err.println("\"before\" parameter could not be parsed, should be formatted as " + dateFormatString) + System.exit(1) + throw e + } + } + } if (options.has(dryRunOpt)) { dryRun = true -- 2.1.2 From 1351639d8e7dc2c4b5e869b05960951a82cc629a Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 23 Oct 2014 15:03:10 -0700 Subject: [PATCH 3/4] Fix naming: entires -> entries. --- bin/kafka-cleanup-obsolete-zk-entires.sh | 19 -- .../kafka/tools/CleanupObsoleteZkEntires.scala | 331 --------------------- .../kafka/tools/CleanupObsoleteZkEntries.scala | 331 +++++++++++++++++++++ 3 files changed, 331 insertions(+), 350 deletions(-) delete mode 100755 bin/kafka-cleanup-obsolete-zk-entires.sh delete mode 100644 core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala create mode 100644 core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala diff --git a/bin/kafka-cleanup-obsolete-zk-entires.sh b/bin/kafka-cleanup-obsolete-zk-entires.sh deleted file mode 100755 index f2c0cb8..0000000 --- a/bin/kafka-cleanup-obsolete-zk-entires.sh +++ /dev/null @@ -1,19 +0,0 @@ -#!/bin/bash -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -base_dir=$(dirname $0) -export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties" -$base_dir/kafka-run-class.sh kafka.tools.CleanupObsoleteZkEntires $@ diff --git a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala deleted file mode 100644 index bb16b59..0000000 --- a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntires.scala +++ /dev/null @@ -1,331 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import joptsimple.OptionParser -import kafka.utils._ -import kafka.utils.ZkUtils._ -import org.apache.zookeeper.data.Stat -import org.I0Itec.zkclient.ZkClient -import java.text.{ParseException, SimpleDateFormat} - -/** - * This tool is removes obsolete znodes from Zookeeper server. Many use cases involve transient consumers - * which end up creating bunch of entries (group and offsets) in Zookeeper. This tool can perform cleanup - * based on topic name or group name. - * - * If topic name is provided as input, it would scan through consumer group entries and delete any that - * had no offset update since the given date. - * - * If group name is provided as input, it would scan through all the topics in the group and perform - * deletion if there were no updates since the time provided. - */ - -object CleanupObsoleteZkEntires extends Logging { - - /* These are some default values used for zookeeper connection */ - private val ZkSessionTimeout: Int = 3000 - private val ZkConnectionTimeout: Int = 3000 - - /* The zookeeper connection string (host:port) */ - private var zkConnect: String = null - - /* The mode in which the tool would run */ - private var deleteBy: String = null - - /* The name of the topic or group to be cleaned up */ - private var topicOrGroupName: String = null - - /* Time threshold. Znodes with modified time < 'before' are deleted */ - private var before: Long = -1 - - /* A flag which indicates if the tool runs in passive mode w/o actually deleting anything */ - private var dryRun: Boolean = false - - def main(args: Array[String]) { - var zkClient: ZkClient = null - - try { - parseArguments(args) - info("Connecting to zookeeper instance at " + zkConnect) - zkClient = new ZkClient(zkConnect, ZkSessionTimeout, ZkConnectionTimeout, ZKStringSerializer) - - deleteBy match { - case "group" => - removeObsoleteConsumerGroups(zkClient, topicOrGroupName, before) - case "topic" => - removeObsoleteConsumerTopics(zkClient, topicOrGroupName, before) - case _ => - System.err.println("Invalid value of \"delete-by\" passed. Valid options are \"topic\" or \"group\"") - System.exit(1) - } - - info("Kafka obsolete Zk entries cleanup tool shutdown successfully.") - } catch { - case e: Throwable => - warn("Removal failed.", e) - warn(Utils.stackTrace(e)) - } finally { - if (zkClient != null) - zkClient.close() - } - } - - /** - * Performs 2 initial checks and then based on the value of time threshold provided, - * would delete the obsolete topics in the group - * - * @param zkClient Zookeeper client object which is already connected - * @param groupID The group-ID - * @param before The time threshold considered for deletion - */ - private def removeObsoleteConsumerGroups(zkClient: ZkClient, groupID: String, before: Long) - { - debug("Removing obsolete consumer groups from group {0} since {1}".format(groupID, before)) - val dirs: ZKGroupDirs = new ZKGroupDirs(groupID) - - if(!pathExists(zkClient, dirs.consumerGroupDir)) { - warn("Path " + dirs.consumerGroupDir + " doesn't exist on the zookeeper instance. Aborted.") - return - } - - if(checkIfLiveConsumers(zkClient, groupID)) { - warn("Aborted because active consumers were found in the group.") - return - } - - if(before == -1) { - // If no threshold for time provided, delete entire group - deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) - } - else { - val childTopics = getChildren(zkClient, dirs.consumerGroupDir + "/offsets") - var numChildrenTopics:Int = childTopics.length - - for(topic <- childTopics) { - val topicPaths = new ZKGroupTopicDirs(groupID, topic) - numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) - } - deleteGroupIfNoTopics(zkClient, dirs, numChildrenTopics) - } - } - - /** - * Scans all the groups to find those which have the required topic. If the topic is present and - * there are no active consumers in that group, it would perform the deletion considering the time - * threshold - * - * @param zkClient Zookeeper client object which is already connected - * @param topic The topic - * @param before The time threshold considered for deletion - */ - private def removeObsoleteConsumerTopics(zkClient: ZkClient, topic: String, before: Long) - { - debug("Removing obsolete consumer groups from topic {0} before {1}".format(topic, before)) - - val stat = new Stat() - val childGroups = getChildren(zkClient, ZkUtils.ConsumersPath) - - for(group <- childGroups) { - val topicPaths = new ZKGroupTopicDirs(group, topic) - - if(pathExists(zkClient, topicPaths.consumerOffsetDir) && !checkIfLiveConsumers(zkClient, group)) { - zkClient.readData(topicPaths.consumerGroupDir + "/offsets", stat) - var numChildrenTopics: Int = stat.getNumChildren() - - if(before == -1) { // If no time threshold provided, delete entire topic - deleteZNodeRecursive(zkClient, topicPaths.consumerOffsetDir) - numChildrenTopics = numChildrenTopics - 1 - } - else { - numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) - } - deleteGroupIfNoTopics(zkClient, topicPaths, numChildrenTopics) - } - } - } - - /** - * Scans the "broker-partition" entries inside a topic and performs deletion if there were no - * updates since the threshold date provided. Also, deletes the topic itself if there are no more - * child entries under it. - * - * @param zkClient Zookeeper client object which is already connected - * @param dirs A convenient ZKGroupTopicDirs object for quick access to paths - * @param before The time threshold considered for deletion - */ - private def removeBrokerPartitionPairs(zkClient: ZkClient, dirs: ZKGroupTopicDirs, before: Long, - numChildrenTopics: Int) : Int = { - debug("Removing broker partition pairs {0} before {1}".format(dirs, before)) - - val stat = new Stat() - val childBrokerPartitionPair = getChildren(zkClient, dirs.consumerOffsetDir) - var numChildrenPairs: Int = childBrokerPartitionPair.length - - for (brokerPartitionPair <- childBrokerPartitionPair) { - val brokerPartitionPath: String = dirs.consumerOffsetDir + "/" + brokerPartitionPair - zkClient.readData(brokerPartitionPath, stat) - debug("Modified time for " + brokerPartitionPath + " is " + stat.getMtime()) - - // delete the node if was never modified after 'before', the threshold timestamp - if (stat.getMtime() < before) { - deleteZNode(zkClient, brokerPartitionPath) - numChildrenPairs = numChildrenPairs - 1 - } - } - - // if the topic is empty, then we can delete it - if (numChildrenPairs == 0) { - deleteZNode(zkClient, dirs.consumerOffsetDir) - (numChildrenTopics - 1) - } else { - numChildrenTopics - } - } - - /** - * If there are no more topics under this group, delete the entire group - * - * @param zkClient Zookeeper client object which is already connected - * @param dirs A convenient ZKGroupDirs object for quick access to consumer paths - * @param numTopics Number of topics present - */ - private def deleteGroupIfNoTopics(zkClient: ZkClient, dirs: ZKGroupDirs, numTopics: Int) { - if(numTopics == 0) { - debug("No topics left in the group \"" + dirs.consumerGroupDir + "\".") - deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) - } - } - - /** - * Performs a defensive check to ensure that there are no consumers currently registered under the group - * - * @param zkClient Zookeeper client object which is already connected - * @param groupID The group-ID - * @return Boolean indicating if there are live comsumers or not - */ - private def checkIfLiveConsumers(zkClient: ZkClient, groupID: String) : Boolean = { - val activeConsumers = getConsumersInGroup(zkClient, groupID) - - if(!activeConsumers.isEmpty) { - warn("The group \"" + groupID + "\" has active consumer(s).") - true - } else { - debug("No live consumers found for group \"" + groupID + "\".") - false - } - } - - /** - * Delete a znode if "dry-run" is off. - * - * @param zkClient Zookeeper client object which is already connected - * @param path The path of znode to be deleted - */ - private def deleteZNode(zkClient: ZkClient, path: String) { - info("Deleting \"" + path + "\".") - if(!dryRun) - deletePath(zkClient, path) - } - - /** - * Delete a znode recursively if "dry-run" is off. - * - * @param zkClient Zookeeper client object which is already connected - * @param path The path of znode to be deleted - */ - private def deleteZNodeRecursive(zkClient: ZkClient, path: String) { - info("Deleting \"" + path + "\" recursively.") - if(!dryRun) - deletePathRecursive(zkClient, path) - } - - /** - * Parse the i/p arguments and set the member variables - * - * @param args Input arguments to the tool - */ - private def parseArguments(args: Array[String]) { - val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" - val dateFormat = new SimpleDateFormat(dateFormatString) - - val parser = new OptionParser - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED parameter. The connection string " + - "(host:port) for zookeeper connection.") - .withRequiredArg - .describedAs("zookeeper") - .ofType(classOf[String]) - - val deleteByOpt = parser.accepts("delete-by", "REQUIRED parameter. Valid values are: " + - "\"topic\" and \"group\" indicate deletion based on " + - "a specific topic/group respectively.") - .withRequiredArg - .describedAs("delete-by") - .ofType(classOf[String]) - - val topicOrGroupOpt = parser.accepts("name", "REQUIRED parameter. Provide the " + - "name of the topic or group, based on \"delete-by\", " + - "which you want to cleanup.") - .withRequiredArg - .describedAs("name") - .ofType(classOf[String]) - - val beforeOpt = parser.accepts("before", "REQUIRED parameter. Date and time used as a threshold. " + - "Znodes with modified time before this timestamp are deleted. " + - "Formatted as " + dateFormatString + ", or use \"-1\" to skip the check.") - .withRequiredArg - .describedAs("before") - .ofType(classOf[String]) - - val dryRunOpt = parser.accepts("dry-run", "OPTIONAL parameter. Passing \"--dry-run\" will " + - "cause this tool to run in passive mode w/o actually deleting " + - "anything from zookeeper but logging all the activities that it " + - "would perform. It is highly recommended to use this setting if " + - "you don't want to risk deleting things and just want to see " + - "znodes which are eligible for deletion.") - - val options = parser.parse(args : _*) - - CommandLineUtils.checkRequiredArgs(parser, options, deleteByOpt, topicOrGroupOpt, zkConnectOpt, beforeOpt) - - deleteBy = options.valueOf(deleteByOpt) - topicOrGroupName = options.valueOf(topicOrGroupOpt) - zkConnect = options.valueOf(zkConnectOpt) - val beforeStr = options.valueOf(beforeOpt) - before = - if (beforeStr == "-1") { - -1 - } - else { - try { - dateFormat.parse(beforeStr).getTime() - } catch { - case e: ParseException => { - System.err.println("\"before\" parameter could not be parsed, should be formatted as " + dateFormatString) - System.exit(1) - throw e - } - } - } - - if (options.has(dryRunOpt)) { - dryRun = true - this.logIdent = "[dry-run] " - } - } -} diff --git a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala new file mode 100644 index 0000000..4271e90 --- /dev/null +++ b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.tools + +import joptsimple.OptionParser +import kafka.utils._ +import kafka.utils.ZkUtils._ +import org.apache.zookeeper.data.Stat +import org.I0Itec.zkclient.ZkClient +import java.text.{ParseException, SimpleDateFormat} + +/** + * This tool is removes obsolete znodes from Zookeeper server. Many use cases involve transient consumers + * which end up creating bunch of entries (group and offsets) in Zookeeper. This tool can perform cleanup + * based on topic name or group name. + * + * If topic name is provided as input, it would scan through consumer group entries and delete any that + * had no offset update since the given date. + * + * If group name is provided as input, it would scan through all the topics in the group and perform + * deletion if there were no updates since the time provided. + */ + +object CleanupObsoleteZkEntries extends Logging { + + /* These are some default values used for zookeeper connection */ + private val ZkSessionTimeout: Int = 3000 + private val ZkConnectionTimeout: Int = 3000 + + /* The zookeeper connection string (host:port) */ + private var zkConnect: String = null + + /* The mode in which the tool would run */ + private var deleteBy: String = null + + /* The name of the topic or group to be cleaned up */ + private var topicOrGroupName: String = null + + /* Time threshold. Znodes with modified time < 'before' are deleted */ + private var before: Long = -1 + + /* A flag which indicates if the tool runs in passive mode w/o actually deleting anything */ + private var dryRun: Boolean = false + + def main(args: Array[String]) { + var zkClient: ZkClient = null + + try { + parseArguments(args) + info("Connecting to zookeeper instance at " + zkConnect) + zkClient = new ZkClient(zkConnect, ZkSessionTimeout, ZkConnectionTimeout, ZKStringSerializer) + + deleteBy match { + case "group" => + removeObsoleteConsumerGroups(zkClient, topicOrGroupName, before) + case "topic" => + removeObsoleteConsumerTopics(zkClient, topicOrGroupName, before) + case _ => + System.err.println("Invalid value of \"delete-by\" passed. Valid options are \"topic\" or \"group\"") + System.exit(1) + } + + info("Kafka obsolete Zk entries cleanup tool shutdown successfully.") + } catch { + case e: Throwable => + warn("Removal failed.", e) + warn(Utils.stackTrace(e)) + } finally { + if (zkClient != null) + zkClient.close() + } + } + + /** + * Performs 2 initial checks and then based on the value of time threshold provided, + * would delete the obsolete topics in the group + * + * @param zkClient Zookeeper client object which is already connected + * @param groupID The group-ID + * @param before The time threshold considered for deletion + */ + private def removeObsoleteConsumerGroups(zkClient: ZkClient, groupID: String, before: Long) + { + debug("Removing obsolete consumer groups from group {0} since {1}".format(groupID, before)) + val dirs: ZKGroupDirs = new ZKGroupDirs(groupID) + + if(!pathExists(zkClient, dirs.consumerGroupDir)) { + warn("Path " + dirs.consumerGroupDir + " doesn't exist on the zookeeper instance. Aborted.") + return + } + + if(checkIfLiveConsumers(zkClient, groupID)) { + warn("Aborted because active consumers were found in the group.") + return + } + + if(before == -1) { + // If no threshold for time provided, delete entire group + deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) + } + else { + val childTopics = getChildren(zkClient, dirs.consumerGroupDir + "/offsets") + var numChildrenTopics:Int = childTopics.length + + for(topic <- childTopics) { + val topicPaths = new ZKGroupTopicDirs(groupID, topic) + numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) + } + deleteGroupIfNoTopics(zkClient, dirs, numChildrenTopics) + } + } + + /** + * Scans all the groups to find those which have the required topic. If the topic is present and + * there are no active consumers in that group, it would perform the deletion considering the time + * threshold + * + * @param zkClient Zookeeper client object which is already connected + * @param topic The topic + * @param before The time threshold considered for deletion + */ + private def removeObsoleteConsumerTopics(zkClient: ZkClient, topic: String, before: Long) + { + debug("Removing obsolete consumer groups from topic {0} before {1}".format(topic, before)) + + val stat = new Stat() + val childGroups = getChildren(zkClient, ZkUtils.ConsumersPath) + + for(group <- childGroups) { + val topicPaths = new ZKGroupTopicDirs(group, topic) + + if(pathExists(zkClient, topicPaths.consumerOffsetDir) && !checkIfLiveConsumers(zkClient, group)) { + zkClient.readData(topicPaths.consumerGroupDir + "/offsets", stat) + var numChildrenTopics: Int = stat.getNumChildren() + + if(before == -1) { // If no time threshold provided, delete entire topic + deleteZNodeRecursive(zkClient, topicPaths.consumerOffsetDir) + numChildrenTopics = numChildrenTopics - 1 + } + else { + numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) + } + deleteGroupIfNoTopics(zkClient, topicPaths, numChildrenTopics) + } + } + } + + /** + * Scans the "broker-partition" entries inside a topic and performs deletion if there were no + * updates since the threshold date provided. Also, deletes the topic itself if there are no more + * child entries under it. + * + * @param zkClient Zookeeper client object which is already connected + * @param dirs A convenient ZKGroupTopicDirs object for quick access to paths + * @param before The time threshold considered for deletion + */ + private def removeBrokerPartitionPairs(zkClient: ZkClient, dirs: ZKGroupTopicDirs, before: Long, + numChildrenTopics: Int) : Int = { + debug("Removing broker partition pairs {0} before {1}".format(dirs, before)) + + val stat = new Stat() + val childBrokerPartitionPair = getChildren(zkClient, dirs.consumerOffsetDir) + var numChildrenPairs: Int = childBrokerPartitionPair.length + + for (brokerPartitionPair <- childBrokerPartitionPair) { + val brokerPartitionPath: String = dirs.consumerOffsetDir + "/" + brokerPartitionPair + zkClient.readData(brokerPartitionPath, stat) + debug("Modified time for " + brokerPartitionPath + " is " + stat.getMtime()) + + // delete the node if was never modified after 'before', the threshold timestamp + if (stat.getMtime() < before) { + deleteZNode(zkClient, brokerPartitionPath) + numChildrenPairs = numChildrenPairs - 1 + } + } + + // if the topic is empty, then we can delete it + if (numChildrenPairs == 0) { + deleteZNode(zkClient, dirs.consumerOffsetDir) + (numChildrenTopics - 1) + } else { + numChildrenTopics + } + } + + /** + * If there are no more topics under this group, delete the entire group + * + * @param zkClient Zookeeper client object which is already connected + * @param dirs A convenient ZKGroupDirs object for quick access to consumer paths + * @param numTopics Number of topics present + */ + private def deleteGroupIfNoTopics(zkClient: ZkClient, dirs: ZKGroupDirs, numTopics: Int) { + if(numTopics == 0) { + debug("No topics left in the group \"" + dirs.consumerGroupDir + "\".") + deleteZNodeRecursive(zkClient, dirs.consumerGroupDir) + } + } + + /** + * Performs a defensive check to ensure that there are no consumers currently registered under the group + * + * @param zkClient Zookeeper client object which is already connected + * @param groupID The group-ID + * @return Boolean indicating if there are live comsumers or not + */ + private def checkIfLiveConsumers(zkClient: ZkClient, groupID: String) : Boolean = { + val activeConsumers = getConsumersInGroup(zkClient, groupID) + + if(!activeConsumers.isEmpty) { + warn("The group \"" + groupID + "\" has active consumer(s).") + true + } else { + debug("No live consumers found for group \"" + groupID + "\".") + false + } + } + + /** + * Delete a znode if "dry-run" is off. + * + * @param zkClient Zookeeper client object which is already connected + * @param path The path of znode to be deleted + */ + private def deleteZNode(zkClient: ZkClient, path: String) { + info("Deleting \"" + path + "\".") + if(!dryRun) + deletePath(zkClient, path) + } + + /** + * Delete a znode recursively if "dry-run" is off. + * + * @param zkClient Zookeeper client object which is already connected + * @param path The path of znode to be deleted + */ + private def deleteZNodeRecursive(zkClient: ZkClient, path: String) { + info("Deleting \"" + path + "\" recursively.") + if(!dryRun) + deletePathRecursive(zkClient, path) + } + + /** + * Parse the i/p arguments and set the member variables + * + * @param args Input arguments to the tool + */ + private def parseArguments(args: Array[String]) { + val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" + val dateFormat = new SimpleDateFormat(dateFormatString) + + val parser = new OptionParser + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED parameter. The connection string " + + "(host:port) for zookeeper connection.") + .withRequiredArg + .describedAs("zookeeper") + .ofType(classOf[String]) + + val deleteByOpt = parser.accepts("delete-by", "REQUIRED parameter. Valid values are: " + + "\"topic\" and \"group\" indicate deletion based on " + + "a specific topic/group respectively.") + .withRequiredArg + .describedAs("delete-by") + .ofType(classOf[String]) + + val topicOrGroupOpt = parser.accepts("name", "REQUIRED parameter. Provide the " + + "name of the topic or group, based on \"delete-by\", " + + "which you want to cleanup.") + .withRequiredArg + .describedAs("name") + .ofType(classOf[String]) + + val beforeOpt = parser.accepts("before", "REQUIRED parameter. Date and time used as a threshold. " + + "Znodes with modified time before this timestamp are deleted. " + + "Formatted as " + dateFormatString + ", or use \"-1\" to skip the check.") + .withRequiredArg + .describedAs("before") + .ofType(classOf[String]) + + val dryRunOpt = parser.accepts("dry-run", "OPTIONAL parameter. Passing \"--dry-run\" will " + + "cause this tool to run in passive mode w/o actually deleting " + + "anything from zookeeper but logging all the activities that it " + + "would perform. It is highly recommended to use this setting if " + + "you don't want to risk deleting things and just want to see " + + "znodes which are eligible for deletion.") + + val options = parser.parse(args : _*) + + CommandLineUtils.checkRequiredArgs(parser, options, deleteByOpt, topicOrGroupOpt, zkConnectOpt, beforeOpt) + + deleteBy = options.valueOf(deleteByOpt) + topicOrGroupName = options.valueOf(topicOrGroupOpt) + zkConnect = options.valueOf(zkConnectOpt) + val beforeStr = options.valueOf(beforeOpt) + before = + if (beforeStr == "-1") { + -1 + } + else { + try { + dateFormat.parse(beforeStr).getTime() + } catch { + case e: ParseException => { + System.err.println("\"before\" parameter could not be parsed, should be formatted as " + dateFormatString) + System.exit(1) + throw e + } + } + } + + if (options.has(dryRunOpt)) { + dryRun = true + this.logIdent = "[dry-run] " + } + } +} -- 2.1.2 From 524422896155d5c8a1adb353bad3e2eddf3b334c Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Thu, 23 Oct 2014 15:47:44 -0700 Subject: [PATCH 4/4] Only remove partitions from a group if all partitions were last modified before the threshold date. --- .../kafka/tools/CleanupObsoleteZkEntries.scala | 29 ++++++++-------------- 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala index 4271e90..3fa8297 100644 --- a/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala +++ b/core/src/main/scala/kafka/tools/CleanupObsoleteZkEntries.scala @@ -119,7 +119,7 @@ object CleanupObsoleteZkEntries extends Logging { for(topic <- childTopics) { val topicPaths = new ZKGroupTopicDirs(groupID, topic) - numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) + numChildrenTopics = removeUnconsumedTopicsFromGroup(zkClient, topicPaths, before, numChildrenTopics) } deleteGroupIfNoTopics(zkClient, dirs, numChildrenTopics) } @@ -153,7 +153,7 @@ object CleanupObsoleteZkEntries extends Logging { numChildrenTopics = numChildrenTopics - 1 } else { - numChildrenTopics = removeBrokerPartitionPairs(zkClient, topicPaths, before, numChildrenTopics) + numChildrenTopics = removeUnconsumedTopicsFromGroup(zkClient, topicPaths, before, numChildrenTopics) } deleteGroupIfNoTopics(zkClient, topicPaths, numChildrenTopics) } @@ -161,37 +161,30 @@ object CleanupObsoleteZkEntries extends Logging { } /** - * Scans the "broker-partition" entries inside a topic and performs deletion if there were no - * updates since the threshold date provided. Also, deletes the topic itself if there are no more - * child entries under it. - * + * Deletes a topic from within a group if there have been no updates to any partitions since the + * date provided. + * * @param zkClient Zookeeper client object which is already connected * @param dirs A convenient ZKGroupTopicDirs object for quick access to paths * @param before The time threshold considered for deletion */ - private def removeBrokerPartitionPairs(zkClient: ZkClient, dirs: ZKGroupTopicDirs, before: Long, + private def removeUnconsumedTopicsFromGroup(zkClient: ZkClient, dirs: ZKGroupTopicDirs, before: Long, numChildrenTopics: Int) : Int = { debug("Removing broker partition pairs {0} before {1}".format(dirs, before)) val stat = new Stat() val childBrokerPartitionPair = getChildren(zkClient, dirs.consumerOffsetDir) - var numChildrenPairs: Int = childBrokerPartitionPair.length - for (brokerPartitionPair <- childBrokerPartitionPair) { + val allChildrenObsolete = childBrokerPartitionPair.forall(brokerPartitionPair => { val brokerPartitionPath: String = dirs.consumerOffsetDir + "/" + brokerPartitionPair zkClient.readData(brokerPartitionPath, stat) debug("Modified time for " + brokerPartitionPath + " is " + stat.getMtime()) - - // delete the node if was never modified after 'before', the threshold timestamp - if (stat.getMtime() < before) { - deleteZNode(zkClient, brokerPartitionPath) - numChildrenPairs = numChildrenPairs - 1 - } - } + (stat.getMtime() < before) + }) // if the topic is empty, then we can delete it - if (numChildrenPairs == 0) { - deleteZNode(zkClient, dirs.consumerOffsetDir) + if (allChildrenObsolete) { + deleteZNodeRecursive(zkClient, dirs.consumerOffsetDir) (numChildrenTopics - 1) } else { numChildrenTopics -- 2.1.2