Kafka
  1. Kafka
  2. KAFKA-212

IllegalThreadStateException in topic watcher for Kafka mirroring

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.1
    • Component/s: None
    • Labels:
      None

      Description

      If the kafka mirroring embedded consumer receives a new topic watcher notification, it runs into the following exception

      [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException (kafka.consumer.ZookeeperTopicEventWatcher)
      [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException
      at java.lang.Thread.start(Thread.java:595)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142)
      at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142)
      at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
      at scala.collection.immutable.List.foreach(List.scala:45)
      at kafka.server.EmbeddedConsumer.startNewConsumerThreads(KafkaServerStartable.scala:142)
      at kafka.server.EmbeddedConsumer.handleTopicEvent(KafkaServerStartable.scala:109)
      at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree2$1(ZookeeperTopicEventWatcher.scala:83)
      at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:78)
      at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
      at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
      (kafka.consumer.ZookeeperTopicEventWatcher)

      This happens since it tries to start a thread which has finished executing

      1. KAFKA-212.patch
        0.7 kB
        Neha Narkhede

        Activity

        Hide
        Neha Narkhede added a comment -

        This patch clears the threadList that holds the older thread references, before adding newer threads to it. This avoids trying to start an already finished thread, thus avoiding IllegalThreadStateException

        Show
        Neha Narkhede added a comment - This patch clears the threadList that holds the older thread references, before adding newer threads to it. This avoids trying to start an already finished thread, thus avoiding IllegalThreadStateException
        Hide
        Jay Kreps added a comment -

        Any thread that doesn't shut down cleanly will leak, is that a problem? Can that happen?

        Show
        Jay Kreps added a comment - Any thread that doesn't shut down cleanly will leak, is that a problem? Can that happen?
        Hide
        Neha Narkhede added a comment -

        the thread shutdown is guarded by a countdown latch. It will finish shutdown only after the thread exists the run() method. The problem here is that we keep older shutdown thread references in that list and end up calling start on those, which leads to the exception

        Show
        Neha Narkhede added a comment - the thread shutdown is guarded by a countdown latch. It will finish shutdown only after the thread exists the run() method. The problem here is that we keep older shutdown thread references in that list and end up calling start on those, which leads to the exception
        Hide
        Neha Narkhede added a comment -

        Can somebody help review this ?

        Show
        Neha Narkhede added a comment - Can somebody help review this ?
        Hide
        Jay Kreps added a comment -

        I don't think that answers my question, though, which is how do we know if we are leaking threads? I guess the patch doesn't make it better or worse, since we definitely don't want to keep them in the list, but can you assess what happens if shutdown fails? Can that happen? Do we log it? Or is there a guarantee that the thread must shutdown in some bounded period of time?

        Show
        Jay Kreps added a comment - I don't think that answers my question, though, which is how do we know if we are leaking threads? I guess the patch doesn't make it better or worse, since we definitely don't want to keep them in the list, but can you assess what happens if shutdown fails? Can that happen? Do we log it? Or is there a guarantee that the thread must shutdown in some bounded period of time?
        Hide
        Neha Narkhede added a comment -

        >> which is how do we know if we are leaking threads?

        Good question. From what I see, the entire thread run() method is guarded by a try-catch-finally block. In the finally block, we count down the latch. So if the thread itself runs into some exception/error, it will exit the run() method and shut itself down. The other case of thread shutdown is when the mirroring thread itself calls the shutdown API. Here, we wait until the current producer send operation succeeds to count down the latch. In both cases, I don't see how we can leak threads

        >> I guess the patch doesn't make it better or worse, since we definitely don't want to keep them in the list, but can you assess what happens if shutdown fails?

        Not true. The patch fixes the bug filed here. Your concerns are about the shutdown logic of the thread, which if you suspect is a bug, can go in a separate JIRA.

        >> Do we log it?

        Yes, in any case of shutdown, it gets logged as a FATAL error.

        >> Or is there a guarantee that the thread must shutdown in some bounded period of time?

        Maybe. If the producer send operation hangs indefinitely, which is a serious bug in the producer send logic.

        Show
        Neha Narkhede added a comment - >> which is how do we know if we are leaking threads? Good question. From what I see, the entire thread run() method is guarded by a try-catch-finally block. In the finally block, we count down the latch. So if the thread itself runs into some exception/error, it will exit the run() method and shut itself down. The other case of thread shutdown is when the mirroring thread itself calls the shutdown API. Here, we wait until the current producer send operation succeeds to count down the latch. In both cases, I don't see how we can leak threads >> I guess the patch doesn't make it better or worse, since we definitely don't want to keep them in the list, but can you assess what happens if shutdown fails? Not true. The patch fixes the bug filed here. Your concerns are about the shutdown logic of the thread, which if you suspect is a bug, can go in a separate JIRA. >> Do we log it? Yes, in any case of shutdown, it gets logged as a FATAL error. >> Or is there a guarantee that the thread must shutdown in some bounded period of time? Maybe. If the producer send operation hangs indefinitely, which is a serious bug in the producer send logic.
        Hide
        Jun Rao added a comment -

        We shouldn't be leaking threads. If we can get to the code that creates new MirrorThreads, the old threads should have finished since shutdown is blocking. If the shutdown blocks forever, we won't be able to create new threads. Again, there is no thread leak. Although the latter would suggest another serious bug somewhere else.

        +1 on the patch.

        Show
        Jun Rao added a comment - We shouldn't be leaking threads. If we can get to the code that creates new MirrorThreads, the old threads should have finished since shutdown is blocking. If the shutdown blocks forever, we won't be able to create new threads. Again, there is no thread leak. Although the latter would suggest another serious bug somewhere else. +1 on the patch.
        Hide
        Neha Narkhede added a comment -

        Fix is committed

        Show
        Neha Narkhede added a comment - Fix is committed

          People

          • Assignee:
            Neha Narkhede
            Reporter:
            Neha Narkhede
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development