From d7adedf254040610f060f95aec944b0ff69aa76f Mon Sep 17 00:00:00 2001
From: Ewen Cheslack-Postava <me@ewencp.org>
Date: Wed, 24 Sep 2014 21:06:38 -0700
Subject: [PATCH] KAFKA-589 Clean up after failed KafkaServer startup

Make KafkaServer call shutdown() if it throws an exception during startup(). The original
exception is rethrown, so existing code expecting the exception will continue to run
correctly but daemon threads will now also be cleaned up.
---
 core/src/main/scala/kafka/server/KafkaServer.scala | 153 +++++++++++----------
 .../scala/kafka/server/KafkaServerStartable.scala  |   4 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |  27 ++++
 3 files changed, 113 insertions(+), 71 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 390fef5..a085429 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -69,62 +69,70 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
-    info("starting")
-    brokerState.newState(Starting)
-    isShuttingDown = new AtomicBoolean(false)
-    shutdownLatch = new CountDownLatch(1)
+    try {
+      info("starting")
+      brokerState.newState(Starting)
+      isShuttingDown = new AtomicBoolean(false)
+      shutdownLatch = new CountDownLatch(1)
 
-    /* start scheduler */
-    kafkaScheduler.startup()
+      /* start scheduler */
+      kafkaScheduler.startup()
     
-    /* setup zookeeper */
-    zkClient = initZk()
+      /* setup zookeeper */
+      zkClient = initZk()
 
-    /* start log manager */
-    logManager = createLogManager(zkClient, brokerState)
-    logManager.startup()
+      /* start log manager */
+      logManager = createLogManager(zkClient, brokerState)
+      logManager.startup()
 
-    socketServer = new SocketServer(config.brokerId,
-                                    config.hostName,
-                                    config.port,
-                                    config.numNetworkThreads,
-                                    config.queuedMaxRequests,
-                                    config.socketSendBufferBytes,
-                                    config.socketReceiveBufferBytes,
-                                    config.socketRequestMaxBytes,
-                                    config.maxConnectionsPerIp,
-                                    config.connectionsMaxIdleMs)
-    socketServer.startup()
+      socketServer = new SocketServer(config.brokerId,
+                                      config.hostName,
+                                      config.port,
+                                      config.numNetworkThreads,
+                                      config.queuedMaxRequests,
+                                      config.socketSendBufferBytes,
+                                      config.socketReceiveBufferBytes,
+                                      config.socketRequestMaxBytes,
+                                      config.maxConnectionsPerIp,
+                                      config.connectionsMaxIdleMs)
+      socketServer.startup()
 
-    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
+      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
 
-    /* start offset manager */
-    offsetManager = createOffsetManager()
+      /* start offset manager */
+      offsetManager = createOffsetManager()
 
-    kafkaController = new KafkaController(config, zkClient, brokerState)
+      kafkaController = new KafkaController(config, zkClient, brokerState)
     
-    /* start processing requests */
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
-    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
-    brokerState.newState(RunningAsBroker)
+      /* start processing requests */
+      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
+      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
+      brokerState.newState(RunningAsBroker)
    
-    Mx4jLoader.maybeLoad()
+      Mx4jLoader.maybeLoad()
 
-    replicaManager.startup()
+      replicaManager.startup()
 
-    kafkaController.startup()
+      kafkaController.startup()
     
-    topicConfigManager = new TopicConfigManager(zkClient, logManager)
-    topicConfigManager.startup()
+      topicConfigManager = new TopicConfigManager(zkClient, logManager)
+      topicConfigManager.startup()
     
-    /* tell everyone we are alive */
-    kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
-    kafkaHealthcheck.startup()
+      /* tell everyone we are alive */
+      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
+      kafkaHealthcheck.startup()
 
     
-    registerStats()
-    startupComplete.set(true)
-    info("started")
+      registerStats()
+      startupComplete.set(true)
+      info("started")
+    }
+    catch {
+      case e: Throwable =>
+        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
+        shutdown()
+        throw e
+    }
   }
   
   private def initZk(): ZkClient = {
@@ -236,35 +244,42 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
    * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
    */
   def shutdown() {
-    info("shutting down")
-    val canShutdown = isShuttingDown.compareAndSet(false, true)
-    if (canShutdown) {
-      Utils.swallow(controlledShutdown())
-      brokerState.newState(BrokerShuttingDown)
-      if(kafkaHealthcheck != null)
-        Utils.swallow(kafkaHealthcheck.shutdown())
-      if(socketServer != null)
-        Utils.swallow(socketServer.shutdown())
-      if(requestHandlerPool != null)
-        Utils.swallow(requestHandlerPool.shutdown())
-      if(offsetManager != null)
-        offsetManager.shutdown()
-      Utils.swallow(kafkaScheduler.shutdown())
-      if(apis != null)
-        Utils.swallow(apis.close())
-      if(replicaManager != null)
-        Utils.swallow(replicaManager.shutdown())
-      if(logManager != null)
-        Utils.swallow(logManager.shutdown())
-      if(kafkaController != null)
-        Utils.swallow(kafkaController.shutdown())
-      if(zkClient != null)
-        Utils.swallow(zkClient.close())
+    try {
+      info("shutting down")
+      val canShutdown = isShuttingDown.compareAndSet(false, true)
+      if (canShutdown) {
+        Utils.swallow(controlledShutdown())
+        brokerState.newState(BrokerShuttingDown)
+        if(kafkaHealthcheck != null)
+          Utils.swallow(kafkaHealthcheck.shutdown())
+        if(socketServer != null)
+          Utils.swallow(socketServer.shutdown())
+        if(requestHandlerPool != null)
+          Utils.swallow(requestHandlerPool.shutdown())
+        if(offsetManager != null)
+          offsetManager.shutdown()
+        Utils.swallow(kafkaScheduler.shutdown())
+        if(apis != null)
+          Utils.swallow(apis.close())
+        if(replicaManager != null)
+          Utils.swallow(replicaManager.shutdown())
+        if(logManager != null)
+          Utils.swallow(logManager.shutdown())
+        if(kafkaController != null)
+          Utils.swallow(kafkaController.shutdown())
+        if(zkClient != null)
+          Utils.swallow(zkClient.close())
 
-      brokerState.newState(NotRunning)
-      shutdownLatch.countDown()
-      startupComplete.set(false)
-      info("shut down completed")
+        brokerState.newState(NotRunning)
+        shutdownLatch.countDown()
+        startupComplete.set(false)
+        info("shut down completed")
+      }
+    }
+    catch {
+      case e: Throwable =>
+        fatal("Fatal error during KafkaServer shutdown.", e)
+        throw e
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index 28658bb..cd64bbe 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -29,8 +29,8 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
     }
     catch {
       case e: Throwable =>
-        fatal("Fatal error during KafkaServerStable startup. Prepare to shutdown", e)
-        shutdown()
+        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
+        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
         System.exit(1)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index ab60e9b..3804a11 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -114,6 +114,33 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     verifyNonDaemonThreadsStatus
   }
 
+  @Test
+  def testCleanShutdownAfterFailedStartup() {
+    val newProps = TestUtils.createBrokerConfig(0, port)
+    newProps.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535")
+    val newConfig = new KafkaConfig(newProps)
+    var server = new KafkaServer(newConfig)
+    try {
+      server.startup()
+      fail("Expected KafkaServer setup to fail, throw exception")
+    }
+    catch {
+      // Try to clean up carefully without hanging even if the test fails. This means trying to accurately
+      // identify the correct exception, making sure the server was shutdown, and cleaning up if anything
+      // goes wrong so that awaitShutdown doesn't hang
+      case e: org.I0Itec.zkclient.exception.ZkException =>
+        assertEquals(server.brokerState.currentState, NotRunning.state)
+        if (server.brokerState.currentState != NotRunning.state)
+          server.shutdown()
+      case e: Throwable =>
+        fail("Expected KafkaServer setup to fail with connection exception but caught a different exception.")
+        server.shutdown()
+    }
+    server.awaitShutdown()
+    Utils.rm(server.config.logDirs)
+    verifyNonDaemonThreadsStatus
+  }
+
   def verifyNonDaemonThreadsStatus() {
     assertEquals(0, Thread.getAllStackTraces.keySet().toArray
       .map(_.asInstanceOf[Thread])
-- 
1.9.1

