Kafka
  1. Kafka
  2. KAFKA-589

Clean shutdown after startup connection failure

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Minor Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.7.2, 0.8.0
    • Fix Version/s: None
    • Component/s: core
    • Labels:

      Description

      Hi,

      I'm embedding the kafka server (0.7.2) in an application container. I've noticed that if I try to start the server without zookeeper being available, by default it gets a zk connection timeout after 6 seconds, and then throws an Exception out of KafkaServer.startup()....E.g., I see this stack trace:

      Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 6000
      at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:876)
      at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:98)
      at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:84)
      at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:44)
      at kafka.log.LogManager.<init>(LogManager.scala:93)
      at kafka.server.KafkaServer.startup(KafkaServer.scala:58)
      ....
      ....

      So that's ok, I can catch the exception, and then shut everything down gracefully, in this case. However, when I do this, it seems there is a daemon thread still around, which doesn't quit, and so the server never actually exits the jvm. Specifically, this thread seems to hang around:

      "kafka-logcleaner-0" prio=5 tid=7fd9b48b1000 nid=0x112c08000 waiting on condition [112c07000]
      java.lang.Thread.State: TIMED_WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <7f40d4be8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
        at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:680)

      Looking at the code in kafka.log.LogManager(), it does seem like it starts up the scheduler to clean logs, before then trying to connect to zk (and in this case fail):

      /* Schedule the cleanup task to delete old logs */
      if(scheduler != null)

      { info("starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) }

      So this scheduler does not appear to be stopped if startup fails. However, if I catch the above RuntimeException, and then call KafkaServer.shutdown(), then it will stop the scheduler, and all is good.

      However, it seems odd that if I get an exception when calling KafkaServer.startup(), that I should still have to do a KafkaServer.shutdown(). Rather, wouldn't it be better to have it internally cleanup after itself if startup() gets an exception? I'm not sure I can reliably call shutdown() after a failed startup()....

        Activity

        Hide
        Jun Rao added a comment -

        This problem exists in 0.8 too. What we need to do is to add a try/catch in KafkaServer.start() and call shutdown if we hit any exceptions.

        Show
        Jun Rao added a comment - This problem exists in 0.8 too. What we need to do is to add a try/catch in KafkaServer.start() and call shutdown if we hit any exceptions.
        Hide
        Swapnil Ghike added a comment -

        Hi Jason,

        Are you using the KafkaServer.startup() or KafkaServerStartable.startup()? The latter calls the former and also shuts the server down in case of an exception.

        Show
        Swapnil Ghike added a comment - Hi Jason, Are you using the KafkaServer.startup() or KafkaServerStartable.startup()? The latter calls the former and also shuts the server down in case of an exception.
        Hide
        Jason Rosenberg added a comment -

        I was using KafkaServerStartable.startup(), but switched to KafkaServer.startup(), because I wanted to have a bit more control of things, e.g. I want to be able know if there was a problem within the container, and retry, etc. In KafkaServerStartable.startup(), if there's an exception, it swallows the exception, and then calls shutdown(), but the caller has no idea if the startup was successful or not.

        But I don't think that's relevant here. I think it's counter intuitive that the KafkaServer.startup() would fail to startup, and throw an exception, and then not cleanup after itself, and require a call to shutdown in the first place.

        Show
        Jason Rosenberg added a comment - I was using KafkaServerStartable.startup(), but switched to KafkaServer.startup(), because I wanted to have a bit more control of things, e.g. I want to be able know if there was a problem within the container, and retry, etc. In KafkaServerStartable.startup(), if there's an exception, it swallows the exception, and then calls shutdown(), but the caller has no idea if the startup was successful or not. But I don't think that's relevant here. I think it's counter intuitive that the KafkaServer.startup() would fail to startup, and throw an exception, and then not cleanup after itself, and require a call to shutdown in the first place.

          People

          • Assignee:
            Swapnil Ghike
            Reporter:
            Jason Rosenberg
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development