From 01b6894e8d74d77696f7fec1652a05523438e10b Mon Sep 17 00:00:00 2001
From: Dave DeMaagd <ddemaagd@linkedin.com>
Date: Thu, 24 Jan 2013 12:34:52 -0800
Subject: [PATCH 1/2] KAFKA-733 add fat jar/assembly

---
 core/build.sbt      |   14 ++++++++++++++
 project/plugins.sbt |    2 ++
 2 files changed, 16 insertions(+), 0 deletions(-)

diff --git a/core/build.sbt b/core/build.sbt
index 211aaf9..448c0c0 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -1,5 +1,6 @@
 import sbt._
 import Keys._
+import AssemblyKeys._
 
 name := "kafka"
 
@@ -24,4 +25,17 @@ libraryDependencies <<= (scalaVersion, libraryDependencies) { (sv, deps) =>
   })
 }
 
+assemblySettings
+
+test in assembly := {}
+
+mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
+  {
+    case PathList("META-INF", xs @ _*) => (xs map {_.toLowerCase}) match {
+      case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
+      case _ => MergeStrategy.deduplicate
+    }
+    case _ => MergeStrategy.last
+  }
+}
 
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 48d44c8..aaf02ed 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -3,3 +3,5 @@ resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.
 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
 
 addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.2.0")
+
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.5")
-- 
1.7.1


From f4a64ffe8c50826ab95170573eead8bdb5c5e3d6 Mon Sep 17 00:00:00 2001
From: Dave DeMaagd <ddemaagd@linkedin.com>
Date: Thu, 24 Jan 2013 17:44:44 -0800
Subject: [PATCH 2/2] KAFKA-735 add looping/json output

---
 .../scala/kafka/tools/ConsumerOffsetChecker.scala  |   78 ++++++++++++++++----
 1 files changed, 64 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 3161435..fa02b33 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -25,7 +25,7 @@ import kafka.consumer.SimpleConsumer
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.TopicAndPartition
 import scala.collection._
-
+import scala.util.control.Breaks._
 
 object ConsumerOffsetChecker extends Logging {
 
@@ -47,7 +47,7 @@ object ConsumerOffsetChecker extends Logging {
   }
 
   private def processPartition(zkClient: ZkClient,
-                               group: String, topic: String, pid: Int) {
+                               group: String, topic: String, pid: Int, asJson: Boolean) {
     val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
             format(group, topic, pid))._1.toLong
     val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
@@ -64,8 +64,25 @@ object ConsumerOffsetChecker extends Logging {
             val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
 
             val lag = logSize - offset
-            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag,
-              owner match {case Some(ownerStr) => ownerStr case None => "none"}))
+
+            if (asJson) {
+              val bhp = consumerMap.get(bid)
+              var bhost = "UNKNOWN"
+              var bport = -1
+							consumerOpt match {
+                case Some(consumer) =>
+                  bhost = consumer.host
+                  bport = consumer.port
+                case None => // ignore
+              }
+
+              println("{\"group\": \"%s\", \"topic\": \"%s\", \"bid\": \"%s\", \"brokerhost\": \"%s\", \"brokerport\": \"%s\", \"pid\": \"%s\", \"consumeroffset\": \"%s\", \"logsize\": \"%s\", \"lag\": \"%s\", \"owner\": \"%s\"}".format(group,
+                topic, bid, bhost, bport, pid, offset, logSize, lag, owner match {case Some(ownerStr) => ownerStr case None => "none"}))
+            } else {
+              println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag,
+                owner match {case Some(ownerStr) => ownerStr case None => "none"}))
+            }
+
             consumer.close()
           case None => // ignore
         }
@@ -74,12 +91,12 @@ object ConsumerOffsetChecker extends Logging {
     }
   }
 
-  private def processTopic(zkClient: ZkClient, group: String, topic: String) {
+  private def processTopic(zkClient: ZkClient, group: String, topic: String, asJson: Boolean) {
     val pidMap = ZkUtils.getPartitionsForTopics(zkClient, Seq(topic))
     pidMap.get(topic) match {
       case Some(pids) =>
         pids.sorted.foreach {
-          pid => processPartition(zkClient, group, topic, pid)
+          pid => processPartition(zkClient, group, topic, pid, asJson)
         }
       case None => // ignore
     }
@@ -105,6 +122,10 @@ object ConsumerOffsetChecker extends Logging {
             withRequiredArg().ofType(classOf[String])
     val groupOpt = parser.accepts("group", "Consumer group.").
             withRequiredArg().ofType(classOf[String])
+    val loopIntervalOpt = parser.accepts("loop", "Loop interval (in seconds, greater than 0)").
+            withRequiredArg().ofType(classOf[String])
+
+    parser.accepts("asjson", "Json Output")
     parser.accepts("broker-info", "Print broker info")
     parser.accepts("help", "Print this message.")
 
@@ -126,7 +147,18 @@ object ConsumerOffsetChecker extends Logging {
     val group = options.valueOf(groupOpt)
     val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt))
       else None
-
+    val asJson = if(options.has("asjson")) true
+      else false
+    var loopInterval: Int = 0
+    loopInterval = if (options.has(loopIntervalOpt)) options.valueOf(loopIntervalOpt).toInt
+      else 0
+
+    if(loopInterval <= 0) {
+      System.err.println("Loop value must be greater than 0: %s".format(options.valueOf(loopIntervalOpt)))
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    loopInterval = loopInterval * 1000
 
     var zkClient: ZkClient = null
     try {
@@ -141,14 +173,32 @@ object ConsumerOffsetChecker extends Logging {
       debug("zkConnect = %s; topics = %s; group = %s".format(
         zkConnect, topicList.toString(), group))
 
-      println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
-      topicList.sorted.foreach {
-        topic => processTopic(zkClient, group, topic)
+      breakable {
+        while(true) {
+          var stime = System.currentTimeMillis
+
+          if (!asJson) {
+            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
+          }
+          topicList.sorted.foreach {
+            topic => processTopic(zkClient, group, topic, asJson)
+          }
+
+          if (!asJson && options.has("broker-info"))
+            printBrokerInfo();
+          var sltime = loopInterval - (System.currentTimeMillis - stime)
+          if(loopInterval > 0) {
+            if(sltime < 0) {
+              warn("Collection time (%s ms) greater than loop interval (%s ms)".format((-1*sltime), loopInterval))
+            } else {
+              debug("Sleeping %s ms".format(sltime))
+              Thread.sleep(sltime)
+            }
+          } else {
+             break
+          }
+        }
       }
-
-      if (options.has("broker-info"))
-        printBrokerInfo();
-
     }
     finally {
       for (consumerOpt <- consumerMap.values) {
-- 
1.7.1

