commit 5768dcf5d533777e81d568f18b615271fd94484e
Author: Tejas Patil <tejasp@apache.org>
Date:   Wed Jul 3 18:52:34 2013 -0700

    Fix for KAFKA-559

diff --git a/bin/delete-obsolete-consumer-groups.sh b/bin/delete-obsolete-consumer-groups.sh
new file mode 100755
index 0000000..3a0d7bf
--- /dev/null
+++ b/bin/delete-obsolete-consumer-groups.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
+$base_dir/kafka-run-class.sh kafka.tools.RemoveObsoleteConsumerGroups $@
+
diff --git a/core/src/main/scala/kafka/tools/RemoveObsoleteConsumerGroups.scala b/core/src/main/scala/kafka/tools/RemoveObsoleteConsumerGroups.scala
new file mode 100644
index 0000000..5d95606
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/RemoveObsoleteConsumerGroups.scala
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple.OptionParser
+import kafka.utils._
+import kafka.utils.ZkUtils._
+import org.apache.zookeeper.data.Stat
+import org.I0Itec.zkclient.ZkClient
+
+/**
+ * Many use cases involve transient consumers. These consumers create entries under their consumer group 
+ * in Zookeeper and maintain offsets there as well. This tool can be used to delete such entries.  
+ */
+
+object RemoveObsoleteConsumerGroups extends Logging { 
+
+  private val zk_session_timeout: Int = 3000
+  private val zk_connection_timeout: Int = 3000
+  private val modified_time = "mtime" 
+  
+  private var zkClient: ZkClient = null
+  private var topic: String = null
+  private var groupID: String = null
+  private var zkConnect: String = null
+  private var since: Long = 0
+  private var dryRun: Boolean = false
+  private var dirs: ZKGroupDirs = null
+  
+  def main(args: Array[String]) {
+    try {
+      	parseArguments(args)
+    	removeObsoleteConsumerGroups(zkConnect, groupID, since)
+    } catch {
+      case e =>
+        println("removal failed because of " + e.getMessage)
+        println(Utils.stackTrace(e))
+    } finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+  
+  def removeObsoleteConsumerGroups(zkConnect: String, groupID: String, since: Long) {
+	  info("Connecting to zookeeper instance at " + zkConnect)
+      zkClient = new ZkClient(zkConnect, zk_session_timeout, zk_connection_timeout, ZKStringSerializer)
+
+	  exitIfNoPathExists(dirs.consumerGroupDir)
+	  exitIfLiveConsumers(groupID)
+      
+      if(since == 0) {        
+         // If no date provided, delete entire group
+         debug("Argument \"since\" is empty. Using default value 0.")
+         
+         if(!dryRun) {
+        	 deletePathRecursive(zkClient, dirs.consumerGroupDir)
+         } else {
+        	 info("[dry-run] Deleting znode \"" + dirs.consumerGroupDir + "\" recursively.")
+         }
+      } 
+      else {
+      	 deleteStaleOffsets(groupID, since)
+      }
+	  info("Removal has successfully completed.")
+  }
+
+  def exitIfNoPathExists(consumerGroupDir: String) {
+      if(!pathExists(zkClient, consumerGroupDir)) {
+        warn("Path \"consumerGroupDir\" doesn't exist on the zookeeper instance. Aborted.")
+        System.exit(1)
+      }
+  }
+  
+  def exitIfLiveConsumers(groupID: String) {
+      // check if there are no live instances under "/consumers/[group]/ids/"
+      val activeConsumers = getConsumersInGroup(zkClient, groupID)
+      
+      if(!activeConsumers.isEmpty) {
+    	 warn("The group \"" + groupID + "\" has active consumer(s). Aborted.")  
+         System.exit(1)    	 
+      }
+      debug("No live consumers found for group \"" + groupID + "\".")
+  }
+  
+  def deleteStaleOffsets(groupID: String, since: Long) {
+     var stat = new Stat()
+     var childTopics = getChildren(zkClient, dirs.consumerGroupDir + "/offsets")
+
+     for(topic <- childTopics) {
+       val topicPaths = new ZKGroupTopicDirs(groupID, topic)
+       
+       zkClient.readData(topicPaths.consumerOffsetDir, stat)
+       debug("modified time for " + topicPaths.consumerOffsetDir + " is " + stat.getMtime())
+       
+       var deletedStuff:Boolean = false
+       var childBrokerPartitionPair = getChildren(zkClient, topicPaths.consumerOffsetDir)
+       for(brokerPartitionPair <- childBrokerPartitionPair) {
+          val brokerPartitionPath = topicPaths.consumerOffsetDir + "/" + brokerPartitionPair
+    	  
+          zkClient.readData(brokerPartitionPath, stat)
+    	  debug("modified time for " + brokerPartitionPath + " is " + stat.getMtime())
+          
+    	  // delete the node if was never modified after 'since', the threshold timestamp
+          if(stat.getMtime() < since) {
+        	  deleteZNode(zkClient, brokerPartitionPath)
+          }
+       }  
+       // if the topic is empty, then we can delete it
+       deleteIfEmpty(zkClient, topicPaths.consumerOffsetDir)
+     }
+     
+     deleteIfEmpty(zkClient, dirs.consumerGroupDir + "/offsets")    
+     
+     // if the "offsets" is empty, delete the entire group
+     if(!pathExists(zkClient, dirs.consumerGroupDir + "/offsets")) {
+        deletePathRecursive(zkClient, dirs.consumerGroupDir)
+     }
+  }
+  
+  def deleteIfEmpty(zkClient: ZkClient, path: String) {
+     var stat = new Stat()
+     zkClient.readData(path , stat)
+     if (stat.getNumChildren() == 0) {
+    	deleteZNode(zkClient, path)
+     }
+  }
+  
+  def deleteZNode(zkClient: ZkClient, path: String) {
+     if(!dryRun) {
+        info("Deleting " + path)
+        deletePath(zkClient, path)
+     } else {
+        info("[dry-run] Deleting " + path)
+     }     
+  }
+  
+  def parseArguments(args: Array[String]) {
+ 
+    val parser = new OptionParser
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED paramter. The connection string (host:port) for zookeeper connection.")
+                           .withRequiredArg
+                           .describedAs("zookeeper")
+                           .ofType(classOf[String])
+                           
+    val groupIDOpt = parser.accepts("group-id", "REQUIRED paramter. The group-id used for removal.")
+                         .withRequiredArg
+                         .describedAs("group-id")
+                         .ofType(classOf[String])
+
+    val sinceOpt = parser.accepts("since", "OPTIONAL paramter. Time elapsed since the epoch considered as threshold. Znodes with modified time " +
+    							  "before this timestamp are deleted. (Default value is 0).")
+                           .withRequiredArg
+                           .describedAs("since")
+                           .ofType(classOf[String])
+
+    val dryRunOpt = parser.accepts("dry-run", "OPTIONAL paramter. Passing \"--dry-run true\" will cause this tool to run in passive mode w/o " +
+    								"actually deleting anything from zookeeper but logging all the activities that it would " +
+    								"perform. It is highly recommended to use this setting if you don't want to risk deleting " +
+    								"things and just want to see znodes which are eligible for deletion.")
+                           .withOptionalArg
+                           .describedAs("dry-run")
+                           .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    for(arg <- List(groupIDOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    groupID = options.valueOf(groupIDOpt)
+    dirs = new ZKGroupDirs(groupID)
+    zkConnect = options.valueOf(zkConnectOpt)
+    since = options.valueOf(sinceOpt).toLong
+ 
+    if (options.valueOf(dryRunOpt) != null && options.valueOf(dryRunOpt).compareToIgnoreCase("true") == 0) {
+       dryRun = true
+    }
+  }
+}
\ No newline at end of file
