Index: core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
===================================================================
--- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala	(revision 1303446)
+++ core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala	(working copy)
@@ -23,6 +23,8 @@
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import kafka.consumer.SimpleConsumer
 import collection.mutable.Map
+import scala.util.control.Breaks._
+
 object ConsumerOffsetChecker extends Logging {
 
   private val consumerMap: Map[String, Option[SimpleConsumer]] = Map()
@@ -45,15 +47,19 @@
   }
 
   private def processPartition(zkClient: ZkClient,
-                               group: String, topic: String, bidPid: String) {
+                               group: String, topic: String, bidPid: String,
+                               outputJSON: Boolean) {
     val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
             format(group, topic, bidPid)).toLong
     val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
             format(group, topic, bidPid))
-    println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
-    println("%20s%s".format("Owner = ", owner))
-    println("%20s%d".format("Consumer offset = ", offset))
-    println("%20s%,d (%,.2fG)".format("= ", offset, offset / math.pow(1024, 3)))
+    val bid,pid=bidPid.split("-")
+    if(!outputJSON) {
+      println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
+      println("%20s%s".format("Owner = ", owner))
+      println("%20s%d".format("Consumer offset = ", offset))
+      println("%20s%,d (%,.2fG)".format("= ", offset, offset / math.pow(1024, 3)))
+    }
 
     bidPid match {
       case BidPidPattern(bid, pid) =>
@@ -61,15 +67,20 @@
           bid, getConsumer(zkClient, bid))
         consumerOpt match {
           case Some(consumer) =>
-            val logSize =
-              consumer.getOffsetsBefore(topic, pid.toInt, -1, 1).last.toLong
-            println("%20s%d".format("Log size = ", logSize))
-            println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3)))
+            val logSize = consumer.getOffsetsBefore(topic, pid.toInt, -1, 1).last.toLong
+            val logSizeG = logSize / math.pow(1024, 3)
+            val lag = logSize - offset
+            val lagG = lag / math.pow(1024, 3)
 
-            val lag = logSize - offset
-            println("%20s%d".format("Consumer lag = ", lag))
-            println("%20s%,d (%,.2fG)".format("= ", lag, lag / math.pow(1024, 3)))
-            println()
+            if(!outputJSON) {
+              println("%20s%d".format("Log size = ", logSize))
+              println("%20s%,d (%,.2fG)".format("= ", logSize, logSizeG))
+              println("%20s%d".format("Consumer lag = ", lag))
+              println("%20s%,d (%,.2fG)".format("= ", lag, lagG))
+              println()
+            } else {
+              println("{\"group\": \"%s\", \"topic\": \"%s\", \"brokerid\": \"%s\", \"partition\": \"%s\", \"owner\": \"%s\", \"consumeroffset\": \"%s\", \"logsize\": \"%s\", \"lag\": \"%s\"}".format(group,topic,bid,pid,owner,offset,logSize,lag))
+            }
           case None => // ignore
         }
       case _ =>
@@ -77,11 +88,11 @@
     }
   }
 
-  private def processTopic(zkClient: ZkClient, group: String, topic: String) {
+  private def processTopic(zkClient: ZkClient, group: String, topic: String, outputJSON: Boolean) {
     val bidsPids = ZkUtils.getChildrenParentMayNotExist(
       zkClient, "/consumers/%s/offsets/%s".format(group, topic)).toList
     bidsPids.sorted.foreach {
-      bidPid => processPartition(zkClient, group, topic, bidPid)
+      bidPid => processPartition(zkClient, group, topic, bidPid, outputJSON)
     }
   }
 
@@ -105,6 +116,10 @@
             withRequiredArg().ofType(classOf[String])
     val groupOpt = parser.accepts("group", "Consumer group.").
             withRequiredArg().ofType(classOf[String])
+    val loopIntervalOpt = parser.accepts("loop", "Loop interval (in seconds, greater than 0)").
+            withRequiredArg().ofType(classOf[String])
+    val outputJSONOpt = parser.accepts("asjson", "JSON Output")
+
     parser.accepts("help", "Print this message.")
 
     val options = parser.parse(args : _*)
@@ -121,6 +136,20 @@
         System.exit(1)
       }
 
+    var outputJSON = if(options.has("asjson")) true
+      else false
+
+    var loopInterval: Int = 0
+    loopInterval = if (options.has(loopIntervalOpt)) options.valueOf(loopIntervalOpt).toInt
+      else 0
+
+    if(loopInterval < 0) {
+			System.err.println("Loop value must be greater than 0: %s".format(options.valueOf(loopIntervalOpt)))
+      parser.printHelpOn(System.err)
+      System.exit(1)
+    }
+    loopInterval = loopInterval * 1000
+
     val zkConnect = options.valueOf(zkConnectOpt)
     val group = options.valueOf(groupOpt)
     val topics = if (options.has(topicsOpt)) Some(options.valueOf(topicsOpt))
@@ -140,11 +169,26 @@
       debug("zkConnect = %s; topics = %s; group = %s".format(
         zkConnect, topicList.toString(), group))
 
-      topicList.sorted.foreach {
-        topic => processTopic(zkClient, group, topic)
+      breakable {  
+        while(true) {
+          var stime = System.currentTimeMillis
+          topicList.sorted.foreach {
+            topic => processTopic(zkClient, group, topic, outputJSON)
+          }
+          var sltime = loopInterval - (System.currentTimeMillis - stime) 
+          if(loopInterval > 0) {
+            if(sltime < 0) {
+              debug("Long loop, this is bad...")
+            } else {
+              Thread.sleep(sltime)
+            }
+          } else {
+             break
+          }
+        }
       }
 
-      printBrokerInfo()
+      if(!outputJSON) printBrokerInfo()
     }
     finally {
       for (consumerOpt <- consumerMap.values) {
