Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-589

Clean shutdown after startup connection failure


    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 0.7.2, 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:



      I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()....E.g., I see this stack trace:

      Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000
      at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
      at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
      at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
      at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
      at kafka.log.LogManager.<init>(LogManager.scala:93)
      at kafka.server.KafkaServer.startup(KafkaServer.scala:58)

      So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around:

      "kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000]
      java.lang.Thread.State: TIMED_WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <7f40d4be8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
        at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:680)

      Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail):

      /* Schedule the cleanup task to delete old logs */
      if(scheduler != null)

      { info("starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) }

      So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good.

      However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup()....


        1. KAFKA-589-v1.patch
          11 kB
          Ewen Cheslack-Postava



            • Assignee:
              ewencp Ewen Cheslack-Postava
              jbrosenberg Jason Rosenberg
            • Votes:
              0 Vote for this issue
              6 Start watching this issue


              • Created: