From ad5dfa33c814b83b44d45bb352844306a2a4088e Mon Sep 17 00:00:00 2001
From: Brenden Matthews <brenden@diddyinc.com>
Date: Thu, 13 Feb 2014 18:21:54 -0800
Subject: [PATCH] Added WaitForReplaction admin tool.

---
 .../scala/kafka/admin/WaitForReplication.scala     | 123 +++++++++++++++++++++
 .../kafka/server/AbstractFetcherManager.scala      |  16 ++-
 .../scala/kafka/server/ReplicaFetcherManager.scala |  21 +++-
 3 files changed, 151 insertions(+), 9 deletions(-)
 create mode 100644 core/src/main/scala/kafka/admin/WaitForReplication.scala

diff --git a/core/src/main/scala/kafka/admin/WaitForReplication.scala b/core/src/main/scala/kafka/admin/WaitForReplication.scala
new file mode 100644
index 0000000..e41e50a
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/WaitForReplication.scala
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import javax.management.remote.{JMXServiceURL, JMXConnectorFactory}
+import javax.management.JMX
+import javax.management.ObjectName
+import scala.Some
+import kafka.common.{TopicAndPartition, BrokerNotAvailableException}
+import kafka.server.{ReplicaFetcherManager, ReplicaFetcherManagerMBean}
+
+
+object WaitForReplication extends Logging {
+
+  private case class BrokerParams(zkConnect: String, brokerId: java.lang.Integer)
+
+  private def invokeTest(params: BrokerParams): Boolean = {
+    var zkClient: ZkClient = null
+    try {
+      zkClient = new ZkClient(params.zkConnect, 30000, 30000, ZKStringSerializer)
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + params.brokerId)._1 match {
+        case Some(zkInfo) =>
+          var brokerHost: String = null
+          var brokerJmxPort: Int = -1
+          try {
+            Json.parseFull(zkInfo) match {
+              case Some(m) =>
+                val brokerInfo = m.asInstanceOf[Map[String, Any]]
+                brokerHost = brokerInfo.get("host").get.toString
+                brokerJmxPort = brokerInfo.get("jmx_port").get.asInstanceOf[Int]
+              case None =>
+                throw new BrokerNotAvailableException("Broker id %d does not exist".format(params.brokerId))
+            }
+          }
+          val jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi".format(brokerHost, brokerJmxPort))
+          info("Connecting to jmx url " + jmxUrl)
+          val jmxc = JMXConnectorFactory.connect(jmxUrl, null)
+          val mbsc = jmxc.getMBeanServerConnection
+          val proxy = JMX.newMBeanProxy(mbsc,
+              new ObjectName(ReplicaFetcherManager.MBeanName),
+              classOf[ReplicaFetcherManagerMBean])
+
+          val maxLag = proxy.getMaxLag
+          println(s"Max lag is currently ${maxLag}")
+          maxLag == 0
+        case None =>
+          throw new BrokerNotAvailableException("Broker id %d does not exist".format(params.brokerId))
+      }
+    } catch {
+      case t: Throwable =>
+        error("Operation failed due to broker failure", t)
+        false
+    } finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val brokerOpt = parser.accepts("broker", "REQUIRED: The broker to wait for.")
+            .withRequiredArg
+            .describedAs("Broker Id")
+            .ofType(classOf[java.lang.Integer])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
+            "Multiple URLS can be given to allow fail-over.")
+            .withRequiredArg
+            .describedAs("urls")
+            .ofType(classOf[String])
+    val numRetriesOpt = parser.accepts("num.retries", "Number of attempts to retry if shutdown does not complete.")
+            .withRequiredArg
+            .describedAs("number of retries")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(10)
+    val retryIntervalOpt = parser.accepts("retry.interval.ms", "Retry interval if retries requested.")
+            .withRequiredArg
+            .describedAs("retry interval in ms (> 1000)")
+            .ofType(classOf[java.lang.Integer])
+            .defaultsTo(10000)
+
+    val options = parser.parse(args : _*)
+    CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt)
+
+    val retryIntervalMs = options.valueOf(retryIntervalOpt).intValue.max(1000)
+    val numRetries = options.valueOf(numRetriesOpt).intValue
+
+    val brokerParams = BrokerParams(options.valueOf(zkConnectOpt), options.valueOf(brokerOpt))
+
+    if (!invokeTest(brokerParams)) {
+      (1 to numRetries).takeWhile(attempt => {
+        info("Retry " + attempt)
+        try {
+          Thread.sleep(retryIntervalMs)
+        }
+        catch {
+          case ie: InterruptedException => // ignore
+        }
+        !invokeTest(brokerParams)
+      })
+    }
+  }
+
+}
+
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 9390edf..1ef1980 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -33,15 +33,19 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
+  def getMaxLag() : Long = {
+    fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
+      fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => {
+        curMaxThread.max(fetcherLagStatsEntry._2.lag)
+      }).max(curMaxAll)
+    })
+  }
+
   newGauge(
     metricPrefix + "-MaxLag",
     new Gauge[Long] {
       // current max lag across all fetchers/topics/partitions
-      def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
-        fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => {
-          curMaxThread.max(fetcherLagStatsEntry._2.lag)
-        }).max(curMaxAll)
-      })
+      def value = getMaxLag
     }
   )
 
@@ -127,4 +131,4 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
 
 case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
 
-case class BrokerAndInitialOffset(broker: Broker, initOffset: Long)
\ No newline at end of file
+case class BrokerAndInitialOffset(broker: Broker, initOffset: Long)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 351dbba..bdbdd67 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -18,18 +18,33 @@
 package kafka.server
 
 import kafka.cluster.Broker
+import kafka.utils.Utils
+
+trait ReplicaFetcherManagerMBean {
+  def getMaxLag(): Long
+}
+
+object ReplicaFetcherManager {
+  val MBeanName = "kafka.server:type=ReplicaFetcherManager,name=ReplicaOps"
+}
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
         extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
-                                       "Replica", brokerConfig.numReplicaFetchers) {
+                                       "Replica", brokerConfig.numReplicaFetchers) with ReplicaFetcherManagerMBean {
+  Utils.registerMBean(this, ReplicaFetcherManager.MBeanName)
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)
   }
 
+  override def getMaxLag(): Long = {
+    super.getMaxLag()
+  }
+
   def shutdown() {
     info("shutting down")
+    Utils.unregisterMBean(ReplicaFetcherManager.MBeanName)
     closeAllFetchers()
     info("shutdown completed")
-  }  
-}
\ No newline at end of file
+  }
+}
-- 
1.9.0

